**Notas para contenedor de docker:**

Comando de docker para ejecución de la nota de forma local:

nota: cambiar `<ruta a mi directorio>` por la ruta de directorio que se desea mapear a `/datos` dentro del contenedor de docker.

```
docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_numerical -p 8888:8888 -p 8786:8786 -p 8787:8787 -d palmoreck/jupyterlab_numerical:1.1.0
```

password para jupyterlab: `qwerty`

Detener el contenedor de docker:

```
docker stop jupyterlab_numerical
```


Documentación de la imagen de docker `palmoreck/jupyterlab_numerical:1.1.0` en [liga](https://github.com/palmoreck/dockerfiles/tree/master/jupyterlab/numerical).

---

Esta nota utiliza métodos vistos en [1.5.Integracion_numerica](https://github.com/ITAM-DS/analisis-numerico-computo-cientifico/blob/master/temas/I.computo_cientifico/1.5.Integracion_numerica.ipynb)

# El módulo de `multiprocessing`

Documentación en: [multiprocessing](https://docs.python.org/3.1/library/multiprocessing.html).

La implementación más utilizada de Python, [CPython](https://en.wikipedia.org/wiki/CPython) no utiliza múltiples cores por default. De tarea queda leer la discusión de la liga anterior en el apartado *Design* sobre el [Global INterpreter Lock: GIL](https://en.wikipedia.org/wiki/Global_interpreter_lock) y el por qué CPython no soporta ejecución *multithreaded* o *multiprocesses*.

El módulo `multiprocessing` nos permite realizar procesamientos basados en procesos o threads para compartir trabajo y datos. Se recomienda usar este módulo para el *shared memory programming* (ver [2.2.Sistemas_de_memoria_compartida](https://github.com/ITAM-DS/analisis-numerico-computo-cientifico/blob/master/temas/II.computo_paralelo/2.2.Sistemas_de_memoria_compartida.ipynb)) y para trabajos que son demandantes de CPU. Para paralelizar trabajos demandantes en I/O no se recomienda su uso.

**Otro módulo en Python para procesamiento utilizando los cores de tu máquina es [concurrent.features](https://docs.python.org/3/library/concurrent.futures.html) que provee el comportamiento principal de `multiprocessing`**. Ver [liga](https://stackoverflow.com/questions/38311431/concurrent-futures-processpoolexecutor-vs-multiprocessing-pool-pool?noredirect=1&lq=1) y [liga](https://stackoverflow.com/questions/20776189/concurrent-futures-vs-multiprocessing-in-python-3) para más sobre `concurrent.futures` y `concurrent.futures` vs `multiprocessing`.

## Nota sobre el GIL y `multiprocessing`

Aunque en Python los threads son nativos del sistema operativo (esto es, no se simulan, son realmente threads del sistema operativo creados en el hardware), están limitados por el *global interpreter lock, GIL*, de modo que un sólo thread interactúe con un objeto Python en un único tiempo.

Al usar el módulo `multiprocessing` ejecutamos en paralelo un número de **interpretadores Python** (CPython), cada uno con su propio espacio de memoria privada y su propio GIL que se ejecutan en un instante (y con un thread). 

**Comentario:** en `multiprocessing` se utilizan subprocesos en lugar de threads.

# Ejemplos

## Hello world!

En `multiprocessing` los procesos son generados al utilizar la clase `Process` para crear objetos y llamar al método `start()`. Ver [Process](https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process) para documentación de ésta clase.

In [1]:
from multiprocessing import Process #importamos clase Process

In [2]:
def f():
    print('hello world! de subproceso')
    
if __name__=='__main__':
    p1 = Process(target=f)
    p2 = Process(target=f)
    p1.start() #start sólo puede ser llamada una vez por objeto Process
    p2.start() 
    p1.join() #el proceso principal espera a que termine p1
    p2.join() #el proceso principal espera a que termine p2
    print('hello world! de proceso')

hello world! de subproceso
hello world! de subproceso
hello world! de proceso


**Comentario:** es una buena práctica explícitamente hacer `join`'s para cada objeto process que realizó `start`.

La clase `Process` recibe la función a ejecutar para cada proceso con el argumento `target` y también tiene `args` para los argumentos de la función:

In [3]:
def f(s):
    print(s)
    
if __name__=='__main__':
    p1 = Process(target=f, args=('hola',))
    p2 = Process(target=f, args=('mundo',)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

hola
mundo


**Comentarios:** 

* Obsérvese que se usa `if __name__=='__main__':` que ayuda a que los subprocesos importen el módulo `__main__` (por lo que no se ejecuta la sección que está dentro de `if __name__=='__main__':` pues no son programas principales) y continúa la ejecución de las líneas de `start` (cada subproceso ejecuta `f`) y `join`. Si se quita este statement por ejemplo:

```
def f(s):
    print(s)
p1 = Process(target=f, args=('hola',))
p2 = Process(target=f, args=('mundo',)) 
p1.start()
p2.start()
p1.join()
p2.join()
```

el notebook quedará bloqueado pues una celda con el código anterior creará subprocesos que a su vez crearán otros subprocesos, que a su vez crearán otros subprocesos... y así de forma recursiva.

* Los argumentos tienen que ser objetos *pickable* o serializados. Ver [what can be pickled and unpickled](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled) para una lista de objetos *pickable*


* En *multiprocessing* tenemos la función `cpu_count` para determinar el número de cores que el sistema operativo puede usar. Este número es la cantidad física o simulada (hyperthreading) de cores.

In [4]:
import multiprocessing

In [5]:
multiprocessing.cpu_count()

2

## Pool of workers, ver [Using a pool of workers](https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers)

La clase `Pool` crea un conjunto (*pool*) de procesos tipo *worker* que procesarán las tareas a realizar vía funciones tipo `map` o `apply`. Se hace `map` del *input* hacia los procesadores y se colecta el *output* de éstos. Mientras el `map` se realiza, el proceso que lanzó el map se bloquea hasta que finalicen las tareas (aunque hay [map_async](https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map_async)). El *output* es una lista.

**Obs:** el procesamiento de las tareas podríamos hacerlo con la clase `Process` de arriba pero tendríamos que utilizar un ciclo y colectar los resultados.

**Comentario:** para un gran número de tareas a realizar utilicen `Pool`, para pocas tareas a realizar (pocas=menos de $10$) utilicen `Process`.

In [6]:
from multiprocessing import Pool #importamos clase Pool

### 1) Hello world!

In [7]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    pool = Pool(multiprocessing.cpu_count())
    results = pool.map(f,range(multiprocessing.cpu_count()))
    print(results)
    pool.close()    
    pool.join()

['hello world!', 'hello world!']


In [8]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    pool = Pool(num_processes)
    results = pool.map(f,range(num_processes))
    print(results)
    pool.close()    
    pool.join()

['hello world!', 'hello world!']


**Con apply:**

In [9]:
def f():
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    pool = Pool(num_processes)
    results = [pool.apply(f) for x in range(num_processes)]
    print(results)
    pool.close()    
    pool.join()

['hello world!', 'hello world!']


**Podemos usar un [context manager](https://book.pythontips.com/en/latest/context_managers.html#context-managers) para evitar tener líneas `pool.close()`, `pool.join()`**

In [10]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    with Pool(processes=num_processes) as pool:
        results = pool.map(f,range(num_processes))
        print(results)

['hello world!', 'hello world!']


### 2) Pasar múltiples argumentos vía [starmap](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap)

In [11]:
def f(s):
    return s
    
if __name__ == '__main__':
    num_processes=2
    with Pool(processes=num_processes) as pool:
        results = pool.starmap(f,[('hola',),('mundo',)])
        print(results)

['hola', 'mundo']


### 3) Regla compuesta del rectángulo

In [12]:
import math
import time
from scipy.integrate import quad

In [13]:
f=lambda x: math.exp(-x**2)
a=0
b=1

**Forma secuencial**

In [14]:
def Rcf(f, a, b, n): #Rcf: rectángulo compuesto para f
    """
    Compute numerical approximation using rectangle or mid-point method in 
    an interval.
    Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
    Args:
        f (lambda expression): lambda expression of integrand
        a (int): left point of interval
        b (int): right point of interval
        n (int): number of subintervals
    Returns:
        Rcf (float) 
    """
    h_hat=(b-a)/n
    sum_res=0
    for i in range(0,n):
        x=a+(i+1/2.0)*h_hat
        sum_res+=f(x)
    return h_hat*sum_res

In [15]:
n=10**6
start_time = time.time()
aprox=Rcf(f,a,b,n)
end_time = time.time()

In [16]:
secs = end_time-start_time
print("Rcf tomó",secs,"segundos" )

Rcf tomó 0.34135937690734863 segundos


In [17]:
obj, err = quad(f, a, b)

In [18]:
def err_relativo(aprox, obj):
    return math.fabs(aprox-obj)/math.fabs(obj) #obsérvese el uso de la librería math

In [19]:
err_relativo(aprox,obj)

6.71939731300312e-14

In [20]:
%timeit -n 5 -r 10 Rcf(f,a,b,n)

304 ms ± 7 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)


**Forma en paralelo**

In [21]:
p=multiprocessing.cpu_count() #número de procesos
ns_p=int(n/p) #número de subintervalos por proceso
              #se asume que n es divisible por p
              #si no se cumple esto, se puede definir 
              #ns_p=int(n/p)
              #y para n: 
              #n=p*ns_p

In [22]:
print("número de subintervalos:",n)

número de subintervalos: 1000000


In [23]:
print("número de subintervalos por proceso:",ns_p)

número de subintervalos por proceso: 500000


In [24]:
def Rcf_parallel(mi_id):
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    sum_res=0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        sum_res+=f(x)
    return h_hat*sum_res
if __name__ == '__main__':
    start_time=time.time()
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel,range(p))
        aprox=sum(results)
    end_time=time.time()

In [25]:
secs = end_time-start_time
print("Rcf_parallel tomó",secs,"segundos" )

Rcf_parallel tomó 0.43401670455932617 segundos


In [26]:
err_relativo(aprox,obj)

5.842307840730588e-14

In [27]:
%%timeit -n 5 -r 10
if __name__ == '__main__':
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel,range(p))
        aprox=sum(results)

232 ms ± 20.4 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)


In [28]:
err_relativo(aprox,obj)

5.842307840730588e-14

## 4) Ejemplo para pasar múltiples parámetros a una función vía un [generator](https://wiki.python.org/moin/Generators).

In [36]:
def f(x):
    return math.exp(-x**2)

In [50]:
def Rcf_parallel2(t):
    fun,a,b,mi_id = t
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    sum_res=0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        sum_res+=fun(x)
    return h_hat*sum_res
if __name__ == '__main__':
    it=((f,a,b,k) for k in range(p))
    start_time=time.time()
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel2,it)
        aprox=sum(results)
    end_time=time.time()

In [51]:
secs = end_time-start_time
print("Rcf_parallel2 tomó",secs,"segundos" )

Rcf_parallel2 tomó 0.3298032283782959 segundos


In [52]:
err_relativo(aprox,obj)

5.842307840730588e-14

In [53]:
%%timeit -n 5 -r 10
if __name__ == '__main__':
    it=((f,a,b,k) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel2,it)
        aprox=sum(results)


256 ms ± 17.5 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)


In [58]:
def Rcf_parallel3(t):
    fun,a,b,mi_id = t
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    sum_res=0
    f_nodes=(f(a+(i+1/2.0)*h_hat) for i in range(begin,end))
    sum_res=sum(f_nodes)
    return h_hat*sum_res
if __name__ == '__main__':
    start_time=time.time()
    it=((f,a,b,k) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel3,it)
        aprox=sum(results)
    end_time=time.time()

In [59]:
secs = end_time-start_time
print("Rcf_parallel2 tomó",secs,"segundos" )

Rcf_parallel2 tomó 0.32567548751831055 segundos


In [60]:
err_relativo(aprox,obj)

5.842307840730588e-14

In [57]:
%%timeit -n 5 -r 10
if __name__ == '__main__':
    it=((f,a,b,k) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel3,it)
        aprox=sum(results)


237 ms ± 25.1 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)


In [65]:
def Rcf_parallel4(t):
    i,a,b,h_hat = t
    f_nodes=f((a+(i+1/2)*h_hat))
    return f_nodes
if __name__ == '__main__':
    start_time=time.time()
    h_hat=(b-a)/n
    it=((i,0,1,h_hat) for i in range(n))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel4,it)
    suma_res=sum(results)
    aprox=h_hat*suma_res
    end_time=time.time()

In [66]:
secs = end_time-start_time
print("Rcf_parallel4 tomó",secs,"segundos" )

Rcf_parallel4 tomó 0.925666332244873 segundos


In [67]:
err_relativo(aprox,obj)

6.71939731300312e-14

In [64]:
%%timeit -n 5 -r 10 
if __name__ == '__main__':
    h_hat=(b-a)/n
    it=((i,a,b,h_hat) for i in range(n))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel4,it)
    suma_res=sum(results)
    aprox=h_hat*suma_res


804 ms ± 24.3 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)


**Referencias**

1. M. Gorelick, I. Ozsvald, High Performance Python, O'Reilly Media, 2014.
