### Actividad: Procesamiento paralelo
Las clases Process y Pool.multiprocessing.Process.

In [None]:
import multiprocessing
import time
# Codigo de Quan Nguyen


class Process(multiprocessing.Process):
    def __init__(self, id):
        super(Process, self).__init__()
        self.id = id

    def run(self):
        time.sleep(1)
        print("Soy el proceso con este id: {}".format(self.id))

if __name__=="__main__":
    processes = Process(1), Process(2), Process(3), Process(4)
    [p.start()for p in processes]

Soy el proceso con este id: 1
Soy el proceso con este id: 2Soy el proceso con este id: 3Soy el proceso con este id: 4




In [None]:
import multiprocessing
import time
# Codigo de Quan Nguyen


class Process(multiprocessing.Process):
    def __init__(self, id):
        super(Process, self).__init__()
        self.id = id

    def run(self):
        time.sleep(1)
        print("Soy el proceso con este id: {}".format(self.id))

def square(x):
    return x * x


def map_test():
    pool = multiprocessing.Pool()

    inputs = [0, 1, 2, 3, 4]
    outputs = pool.map(square, inputs)
    print(outputs)
    outputs_async = pool.map_async(square, inputs)
    outputs = outputs_async.get()
    print(outputs)

if __name__ ="__main__":
    processes = Process(1), Process(2), Process(3), Process(4)
    [p.start()for p in processes]   
  
map_test()

In [None]:
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=4)
fut = executor.submit(square, 2)

result = executor.map(square, [0, 1, 2, 3, 4])
list(result)

In [None]:
import concurrent.futures

# Definimos la función que queremos ejecutar de forma asíncrona.
def square(x):
    return x * x

# Creando el executor para manejar la ejecución concurrente
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # Enviando tareas al executor
    fut1 = executor.submit(square, 2)
    fut2 = executor.submit(square, 3)
    
    # Usando 'wait' para esperar a que ambos futures se completen
    concurrent.futures.wait([fut1, fut2])
    
    # Extraemos los resultados una vez que sabemos que están completos y los almacenamos en una lista
    results_with_wait = [fut1.result(), fut2.result()]
    print("Resultados obtenidos usando wait:", results_with_wait)
    
    # Usando 'as_completed' para iterar sobre los futures a medida que se completan
    # Reenviamos las tareas porque fut1 y fut2 ya están completados y consumidos.
    fut1 = executor.submit(square, 2)
    fut2 = executor.submit(square, 3)
    
    results = concurrent.futures.as_completed([fut1, fut2])
    results_list_as_completed = [future.result() for future in results]
    print("Resultados obtenidos usando as_completed:", results_list_as_completed)

# El contexto 'with' asegura que el executor se cierre correctamente después de usarlo.

  


**Ejercicio:**  Genera futures usando la función `asyncio.run_in_executor` y manipular los resultados usando todas las herramientas y sintaxis proporcionadas por la biblioteca `asyncio` para que pueda lograr concurrencia y paralelismo al mismo tiempo. 

In [None]:
## Tu respuesta

### El método de Monte Carlo para estimar el valor de Pi.

El primer paso al escribir un programa paralelo es desarrollar una versión en serie y verificar que funcione correctamente. En un escenario del mundo real, es recomendable dejar la paralelización como el último paso del proceso de optimización; en primer lugar, porque es necesario identificar las partes más lentas y, en segundo lugar, porque la paralelización requiere una inversión significativa de tiempo y, en el mejor de los casos, ofrece una aceleración que no supera el número de procesadores disponibles. A continuación, se presenta la implementación del programa en serie:

In [None]:
import random
samples = 1000000
hits = 0
for i in range(samples):
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1:
        hits += 1
pi = 4.0 * hits/samples

**Ejercicio:** modifica el codigo para realizar una  funcional y eficiente la aproximación de Pi de manera serial en Python. 

In [None]:
## Tu respuesta

Para paralelizar este código, podemos escribir una función, llamada `sample`, que corresponda a una única verificación de acierto y error. Si la muestra acierta al círculo, la función devolverá 1; de lo contrario, devolverá 0. Al ejecutar `sample` varias veces y sumar los resultados, obtendremos el número total de aciertos. 

Podemos ejecutar la muestra en varios procesadores con `apply_async` y obtener los resultados de la siguiente manera: 

In [None]:
def sample():
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1:
        return 1
    else:
        return 0
pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample) for i in range(samples)]
hits = sum(r.get() for r in results_async)

**Ejercicio:** Empaqueta las dos versiones llamadas como `pi_serial` y `pi_apply_async`  y comparar la velocidad de ejecución, de la siguiente manera: 

- `time python -c 'import pi; pi.pi_serial()'`
- `time python -c 'import pi; pi.pi_apply_async()'`

In [None]:
## Tu codigo

In [None]:
def sample_multiple(samples_partial): 
  return sum(sample() for i in range(samples_partial)) 

n_tasks = 10 

chunk_size = samples/n_tasks 

pool = multiprocessing.Pool() 

**Ejercicio:** Incluye este código en una función llamada `pi_apply_async_chunked` y ejecuta el código de la siguiente manera: 

* ` time python -c 'import pi; pi.pi_apply_async_chunked()'`

¿Qué observas?

In [None]:
# Tu respuesta

### Código completo de respuesta

In [None]:
import multiprocessing
import random

samples = 10000000


def pi_serial():
    hits = 0

    for i in range(samples):
        x = random.uniform(-1.0, 1.0)
        y = random.uniform(-1.0, 1.0)

        if x ** 2 + y ** 2 <= 1:
            hits += 1

    pi = 4.0 * hits / samples
    return pi


def sample():
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)

    if x ** 2 + y ** 2 <= 1:
        return 1
    else:
        return 0


def pi_apply_async():
    pool = multiprocessing.Pool()
    results_async = [pool.apply_async(sample) for i in range(samples)]
    hits = sum(r.get() for r in results_async)

    pi = 4.0 * hits / samples
    return pi


def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))


def pi_apply_async_chunked():
    ntasks = 10
    chunk_size = samples / ntasks
    pool = multiprocessing.Pool()
    results_async = [
        pool.apply_async(sample_multiple, [chunk_size]) for i in range(ntasks)
    ]
    hits = sum(r.get() for r in results_async)

    pi = 4.0 * hits / samples
    return pi

### Sincronización y bloqueos 

In [None]:
import multiprocessing
# Codigo de Quan Nguyen

class Process(multiprocessing.Process):
    def __init__(self, counter):
        super(Process, self).__init__()
        self.counter = counter

    def run(self):
        for i in range(1000):
                self.counter.value += 1


def main():
    counter = multiprocessing.Value("i", lock=True)
    counter.value = 0

    processes = [Process(counter) for i in range(4)]
    [p.start() for p in processes]
    [p.join() for p in processes]
    print(counter.value)


if __name__ == "__main__":
    main()

**Pregunta:** ¿Encuentras algún problema en el código anterior?. Proporciona una solución para resolver algún inconveniente.

In [None]:
## Tu respuesta

### OpenMP 



In [None]:
pip install cython numpy

In [None]:
%load_ext cython

In [None]:
%%cython
import numpy as np
cimport numpy as cnp  # esto es necesario para las declaraciones de tipo de datos

def square_serial(cnp.ndarray[cnp.float64_t, ndim=1] inp):
    cdef int i, size = inp.shape[0]
    cdef cnp.ndarray[cnp.float64_t, ndim=1] out = np.empty(size, dtype=np.float64)
    
    for i in range(size):
        out[i] = inp[i] * inp[i]

    return np.array(out)


In [None]:
inp_array = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64)
result = square_serial(inp_array)
print(result)

Implementar una versión paralela del bucle sobre los elementos de la matriz implica sustituir la llamada de `range` por `prange`.  

In [None]:
%%cython --compile-args=-fopenmp --link-args=-fopenmp --force
from cython.parallel import prange
import numpy as np
cimport cython
cimport numpy as cnp

@cython.boundscheck(False)  # Desactivar la comprobación de límites de los arrays
@cython.wraparound(False)   # Desactivar el envoltorio negativo
def square_parallel(cnp.ndarray[cnp.float64_t, ndim=1] inp):
    cdef int i, size = inp.shape[0]
    cdef cnp.ndarray[cnp.float64_t, ndim=1] out = np.empty(size, dtype=np.float64)
    
    # Usar prange aquí para la ejecución en paralelo
    for i in prange(size, nogil=True):
        out[i] = inp[i] * inp[i]

    return np.array(out)


In [None]:
inp_array = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64)
result = square_parallel(inp_array)
print(result)

El uso de `nogil` en Cython es una estrategia clave para habilitar el verdadero paralelismo de múltiples hilos en código que no necesitan interactuar con objetos de Python o cualquier cosa que requiera el Global Interpreter Lock (GIL). `nogil` literalmente significa `no GIL` y cuando se usa en un contexto (como con `prange`), permite que ese bloque de código se ejecute sin mantener el GIL, que es necesario para realizar operaciones verdaderamente paralelas en múltiples núcleos de la CPU.

Para utilizar nogil de manera efectiva en Cython, sigue estos pasos:

1. Asegurar que el código sea seguro sin GIL: solo código que no acceda a objetos de Python o realice operaciones que requieran el GIL puede ejecutarse de manera segura sin este. Por ejemplo, operaciones numéricas en arrays de NumPy (que no involucren la creación de nuevos objetos de Python) pueden hacerse sin el GIL.

2. Usar `with nogil:` o `nogil=True`: puedes usar `with nogil:` para designar un bloque de código que se ejecutará sin el GIL. En `prange`, puedes pasar `nogil=True` como un argumento para indicar que el cuerpo del bucle se ejecutará sin el GIL.

3. Compilar con soporte para multihilos: asegúrate de que tu código se compile con soporte para multihilos (usualmente OpenMP) para aprovechar el paralelismo.

In [None]:
%%cython --compile-args=-fopenmp --link-args=-fopenmp
from cython.parallel import prange
import numpy as np
cimport numpy as cnp
cimport cython

@cython.boundscheck(False)  # Desactiva la verificación de límites para mejorar el rendimiento
@cython.wraparound(False)   # Desactiva el envoltorio negativo
def square_parallel_nogil(cnp.ndarray[cnp.float64_t, ndim=1] inp):
    cdef int i, size = inp.shape[0]
    cdef cnp.ndarray[cnp.float64_t, ndim=1] out = np.empty(size, dtype=np.float64)
    
    # Ejecuta el bucle en paralelo y sin el GIL
    for i in prange(size, nogil=True):
        out[i] = inp[i] * inp[i]

    return np.array(out)



In [None]:
inp_array = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64)
result = square_parallel_nogil(inp_array)
print(result)


### Paralelismo automático 

Ejemplos de paquetes que implementan paralelismo automático son los (ahora) familiares compiler just-in-time (JIT) [numexpr](https://github.com/pydata/numexpr) y [Numba](https://numba.pydata.org/). Se han desarrollado otros paquetes para optimizar y paralelizar automáticamente matrices y expresiones intensivas en matrices, que son cruciales en aplicaciones numéricas y de aprendizaje automático (ML) específicas. 

Theano es un proyecto que te permite definir una expresión matemática en matrices (más generalmente, tensores) y compilarlas en un lenguaje rápido, como C o C++. Muchas de las operaciones que implementa Theano son paralelizables y pueden ejecutarse tanto en la CPU como en la GPU. 

TensorFlow es otra biblioteca similar a Theano,  dirigida al uso de expresiones matemáticas con uso intensivo de matrices pero, en lugar de traducir las expresiones a código C especializado, ejecuta las operaciones en un motor C++ eficiente. 

Tanto Theano como TensorFlow son ideales cuando el problema en cuestión se puede expresar en una cadena de operaciones matriciales y de elementos (como redes neuronales). 

#### Theano

In [None]:
pip install Theano

Ejemplo del cálculo de pi, usando Teano.

In [None]:
import numpy as np
import theano.tensor as T
import theano as th
# Codigo de Quan Nguyen

th.config.openmp_elemwise_minsize = 1000
th.config.openmp = True

x = T.vector("x")
y = T.vector("y")

hit_test = x ** 2 + y ** 2 < 1
hits = hit_test.sum()
misses = x.shape[0]
pi_est = 4 * hits / misses

calculate_pi = th.function([x, y], pi_est)

x_val = np.random.uniform(-1, 1, 30000)
y_val = np.random.uniform(-1, 1, 30000)

import timeit

res = timeit.timeit(
    "calculate_pi(x_val, y_val)",
    "from __main__ import x_val, y_val, calculate_pi",
    number=100000,
)
print(res)

**Ejercicio** Modifica el código anterior para verificar que Theano generará un código ligeramente diferente que se beneficia menos de múltiples subprocesos, cuando se cambia el código:

```
# hits = hit_test.sum()
hits = hit_test.astype('int32').sum()
```
Comprueba que si vuelves a ejecutar el bechmarking, la cantidad de subprocesos no afecta significativamente el tiempo de ejecución:

```
$ OMP_NUM_THREADS=1 python theano.py
$ OMP_NUM_THREADS=2 python theano.py
$ OMP_NUM_THREADS=3 python theano.py
$ OMP_NUM_THREADS=4 python theano.py
```

In [None]:
## Tus respuestas

#### Profiling Theano

Para generar datos de creación profiling, la única modificación necesaria es agregar la opción `profile=True` a la función `th`, como se ilustra en el siguiente fragmento de código:

```
calculate_pi = th.function([x, y], pi_est,profile=True)
```

El resumen del profile se puede imprimir en la salida emitiendo el comando `summary`, de la siguiente manera:

```
calculate_pi.profile.summary()
```

**Pregunta:** ¿Cuál es el resultado impreso de `calculate_pi.profile.summary()`?, ¿ qué secciones aparecen en el resultado?, ¿es consistente con el primer benchmark?

In [None]:
## Tu respuesta