En lineas generales hay dos caras para este tema:

- Threading : que se utiliza para procesos ligados a io : Interfaces para descargar y abrir archivos por ejemplo
- Paralelization : que hace referencia a procesos hechos por la cpu.

In [14]:
# =============================================================================
# IMPORTS
# =============================================================================

import time
import multiprocessing
import concurrent.futures as cf

# Paralelizacion utilizando la libreria nativa de python concurrent y multiprocessing

- Fuente: <a href="https://www.youtube.com/watch?v=fKl2JW_qrso">Video </a>
- Github asociado al video: <a href="https://github.com/CoreyMSchafer/code_snippets/blob/master/Python/MultiProcessing/multiprocessing-demo.py"> github</a>

## Medicion de tiempo utilizando time.perf_counter

- Antes de empezar el tutorial tengamos en cuenta que las mediciones del tiempo las haremos utilizando time.perf_counter debido a que esta funcion mide el tiempo del cpu (distinto de time.time que mide el tiempo real).

- Puede ver mas acerca de esto en el siguiente <a href="https://stackoverflow.com/questions/66036844/time-time-or-time-perf-counter-which-is-faster">post</a> de stack overflow.

## Paralelizacion utilizando multiprocessing

Suponga que lo que hace es llamar varias veces a una funcion como en el siguiente ejemplo:

```start = time.perf_counter()```


```def do_something(seconds):``` <br>
        <p>```print(f'Sleeping {seconds} second(s)...')```<br>
        <p>```time.sleep(seconds)```<br>
        <p>```return f'Done Sleeping...{seconds}'```<br>

```finish = time.perf_counter()```

En este sencillo ejemplo  tenemos una funcion que se llama do_sommething, la cual tiene un unico input llamados seconds que dice cuanto tiempo esperar hasta retornar un string que dice que durmio cierta cantidad de segundos.

Piense ahora que quisiera ejecutar varias veces este proceso de forma sincronica con el valor ```seconds=1``` , si lo hiciera 10 veces entonces tardaria 10 segundos.

### Creando un proceso sencillo

Vamos ahora a utilizarla libreria multiprocessing para paralelizar estas corridas. Para ello tenemos que crear primero un proceso:

- p = multiprocessing.Process(target=< funcion>, args=< args>, kwargs=dict)
    - Se aclara que args son los argumentos de la funcion. En el video se indica que pasar argumentos puede ser complicado porque los argumentos se deberian pasar de forma serializada, usando pickle por ejemplo.
    - Observese que se tiene la opcion de kwargs para argumentos nombrados.

Para que el proceso inicie debemos utilizar el metodo start del mismo:

- p.start()

Asi como lo iniciamos lo 'terminamos', .join() indica que se debe esperar hasta que el proceso se complete.


# Process pool executor - Libreria concurrent

- Introducido en python 3.2
- Permite intercambiar entre uso de thread y de proceso, lo cual no se puede con lo anterior.
- Se encuentra en la libreria concurrent
- Puede utilizarse con context manager

- .submit() : programa la funcion a paralelizar y retorna un objeto futuro.
- El objeto futuro encapsula la ejecucion de la funcion y permite verificarla despues de que se ha programado. Podemos verificar si esta corriendo o si ha terminado, asi como tambien verificar el resultado.
- Para accerder al resultado utilizamos .result()

- Observar en el ejemplo: ```f = executor.submit(do_something, seconds=1)``` que si la funcion tiene argumentos nombrados, basta con escribirlos como tal en la segunda entrada.

- Para obtener los resultados terminados de los futures utilizamos el metodo .as_compleated() . Puede verse una discusion en stack overflow <a href="https://stackoverflow.com/questions/51239251/how-does-concurrent-futures-as-completed-work">aqui</a>

- si combinamos con map, podemos obtener los resultados en el orden en que empezaron, aunque esto no significa que este sea el orden con el que han terminado los procesos.

## Diferencia .submit() vs .map()

.submit() vs .map(). They both accept the jobs immediately (submitted|mapped - start). They take the same time to complete, 11 seconds (last result time - start). However, .submit() gives results as soon as any thread in the ThreadPoolExecutor maxThreads=2 completes (unordered!). While .map() gives results in the order they are submitted.

- Fuente : https://stackoverflow.com/questions/20838162/how-does-threadpoolexecutor-map-differ-from-threadpoolexecutor-submit

In [9]:
# Aca definimos la funcion

def do_something(*,seconds):
    print(f'Sleeping {seconds} second(s)...')
    time.sleep(seconds)
    return f'Done Sleeping...{seconds}'


# Vamos a pedir ejecutar 10 veces la funcion con seconds=1

start = time.perf_counter()
for _ in range(10):
    do_something(seconds=1)
finish = time.perf_counter()

print(finish-start)

Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
Sleeping 1 second(s)...
10.00232383608818


In [None]:
# Ejemplo encadenando 10 procesos con multiprocessing.



# Forma Erronea
start = time.perf_counter()
for _ in range(10):
    p = multiprocessing.Process(target=do_something,args=[1.])
    p.start()
    p.join()
finish = time.perf_counter()
print(finish-start) # Este proceso tardara 10 segundos

# El problema aqui es que p.join() se ejecuta antes de crear el siguiente proceso, asi que basicamente estamos haciendo una corrida serial.
## Observacion p , no puede ser definido fuera del loop for, nos dara error.

Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
10.155907776905224


In [13]:
# Forma correcta

processes = []
start = time.perf_counter()
for _ in range(10):
    p = multiprocessing.Process(target=do_something,kwargs={'seconds':1.})
    p.start()
    processes.append(p)

for process in processes:
    process.join()
finish = time.perf_counter()
print(finish-start)

# Como puede verse todo el procedimiento ahora si tardo 1 segundo. Se ha indicado que los procesos se terminen en el otro for loop.

Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...
Sleeping 1.0 second(s)...


1.0630228258669376


In [58]:
# Ahora vamos con la libreria concurrent
# with cf.ProcessPoolExecutor() as executor:
#     f = executor.submit(do_something, seconds=1)
#     print(f.result())

# Ahora utilizemos un loop
with cf.ProcessPoolExecutor() as executor:
    results = [executor.submit(do_something, seconds=sec) for sec in range(5)]

# for f in cf.as_completed(results):
#     print(f.result())

r = [f.result() for f in cf.as_completed(results)]
r
# Observe como las completadas terminan en desorden.

Sleeping 2 second(s)...Sleeping 0 second(s)...Sleeping 1 second(s)...Sleeping 3 second(s)...Sleeping 4 second(s)...






['Done Sleeping...2',
 'Done Sleeping...3',
 'Done Sleeping...4',
 'Done Sleeping...1',
 'Done Sleeping...0']

In [None]:
# Ultimo ejemplo con map

secs = list(range(5))
with cf.ProcessPoolExecutor() as executor:
    results = list(map(lambda x: executor.submit(do_something, seconds=x), secs))
    # results = list(executor.map(lambda x:do_something(seconds=x), secs))

for f in cf.as_completed(results):
    print(f.result())

Sleeping 3 second(s)...Sleeping 0 second(s)...Sleeping 1 second(s)...Sleeping 2 second(s)...Sleeping 4 second(s)...






Done Sleeping...0
Done Sleeping...3
Done Sleeping...2
Done Sleeping...1
Done Sleeping...4


In [46]:
# Deal with named parameters
def wrap_do_something(seconds):
    return do_something(seconds=seconds)

with cf.ProcessPoolExecutor() as executor:
    results = executor.map(wrap_do_something, secs)

list(results)

Sleeping 2 second(s)...Sleeping 0 second(s)...Sleeping 3 second(s)...Sleeping 1 second(s)...Sleeping 4 second(s)...






['Done Sleeping...0',
 'Done Sleeping...1',
 'Done Sleeping...2',
 'Done Sleeping...3',
 'Done Sleeping...4']

In [54]:
def calculator(a,b):
    return a+b

# def calculator(a):
#     return a+a

a1 = list(range(50))
a2 = list(range(50))

aa = list(zip(a1,a2))

with cf.ProcessPoolExecutor() as executor:
    res = executor.map(calculator,a1,a2)

print(list(res))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]


# Wrap Up

Para paralelizar debo:

- with cf.ProcessPoolExecutor() as executor:
    - results = executor.map(< funcion>, < valores>)

- list(results)

- Si los argumentos son nombrados haga un wrapper (Ver ejemplo arriba), queda feo pero es barato:

def wrap_do_something(seconds):
    return do_something(seconds=seconds) 

# Paralelizando con Joblib

Joblib builds on the Loky library (itself an improvement over Python’s concurrent.futures) and uses cloudpickle to enable the pickling of functions defined in the interactive scope. This solves a couple of common issues that are encountered with the built-in multiprocessing library.

## Utilizacion basica:

- from joblib import Parallel, delayed

- output = Parallel(n_jobs=< cores to use>)(delayed(< function>)(args) for args in arguments)


## joblib.Parallel:
<a href="https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html"> Documentacion </a> oficial.

Elementos mas relevantes

- n_jobs : Nucleos a utilizar , -1 para utilizar todos, siempre y cuando el backend sea loky.
- backend : loky default otros" [multiprocessing, threading]
- return_as : como quieres que se te retorne el output [list, generator, generator_unordered]
    - list : se retorna en forma de lista
    - generator : se retorna un generador en el orden en el que se enviaron las tareas
    - generator_unordered : retorna los resultados en cuanto esten disponibles sin importar el orden en el que se iniciaron.
    - verbose : Numero entre [0, 50] nivel de verbosidad en la salida de los procesos. *if non zero, progress messages are printed. Above 50, the output is sent to stdout.*
    - timeout : Tiempo maximo asignado para que se complete una tarea. Si una tarea no se completa en tiempo se levantara: TimeOutError, solamente aplicable cuando n_jobs!=1
    - pre_dispatch : The number of batches (of tasks) to be pre-dispatched. (Puede ser importante pero todavia no lo entiendo)
    - batch_size : The number of atomic tasks to dispatch at once to each worker.
    - Otras opciones avanzadas, ver en la documentacion de referencia.

## Delayed

Es un decorador, utilizado para capturar argumentos de una funcion (Documentacion Oficial)