# Procesamiento paralelo
Una de las librerias que nos permite el procesamiento paralelo de procesos es `multiprocessing`.

Para mayor entendimiento de la libreria por favor remitirse a:
* https://docs.python.org/3/library/multiprocessing.html
* https://machinelearningmastery.com/multiprocessing-in-python/
* https://towardsdatascience.com/parallelism-with-python-part-1-196f0458ca14

In [1]:
import multiprocessing as mp
import time

Es indispensable para poder usar la capacidad de procesamiento paralelo dentro de un `notebook` crear un archivo `.py` con la funcion o funciones que se van a paralelizar.

Para el ejercicio se creo un archivo llamado `workers_mp.py` con las funciones `dummy_worker` y `dummy_worker_error`

Estas funciones realizar operaciones aleatorias, `dummy_worker_error` adicionalmente presenta errores de forma aleatoria, para poder simular errores.

In [2]:
from workers_mp import dummy_worker, dummy_worker_error, dummy_worker_int_str

Como recomendacion no usar todos los nucleos, cuando se vaya a procesar ya que puede incurrir en bloqueos de la maquina, a continuacion se dejan libres el 20% de los nucleos, si por alguna razon el 20% es menor a 1, se deja un nuleo disponible.

In [3]:
numbers_of_cores = mp.cpu_count()
numbers_2_use = numbers_of_cores - int(round(numbers_of_cores*0.2,0))
if numbers_of_cores==numbers_2_use:
    numbers_2_use = numbers_2_use-1
print(f"#cores={numbers_of_cores}, #cores to use={numbers_2_use}")

#cores=4, #cores to use=3


argumentos de prueba:

In [4]:
args = [x for x in range(20)]
args

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

# Ejecusiones de prueba

## Serial

In [5]:
## serial
start = time.time()
results = list()
for i in args:
    d = dummy_worker(i)
    results.append(d)
print("T(serial)=", time.time()-start)

T(serial)= 30.117814779281616


In [6]:
results

[{'a': 0, 'i': 8, 'i2': 8},
 {'a': 1, 'i': 4, 'i2': 5},
 {'a': 2, 'i': 9, 'i2': 11},
 {'a': 3, 'i': 3, 'i2': 6},
 {'a': 4, 'i': 2, 'i2': 6},
 {'a': 5, 'i': 2, 'i2': 7},
 {'a': 6, 'i': 6, 'i2': 12},
 {'a': 7, 'i': 4, 'i2': 11},
 {'a': 8, 'i': 8, 'i2': 16},
 {'a': 9, 'i': 1, 'i2': 10},
 {'a': 10, 'i': 5, 'i2': 15},
 {'a': 11, 'i': 4, 'i2': 15},
 {'a': 12, 'i': 8, 'i2': 20},
 {'a': 13, 'i': 5, 'i2': 18},
 {'a': 14, 'i': 6, 'i2': 20},
 {'a': 15, 'i': 3, 'i2': 18},
 {'a': 16, 'i': 2, 'i2': 18},
 {'a': 17, 'i': 6, 'i2': 23},
 {'a': 18, 'i': 8, 'i2': 26},
 {'a': 19, 'i': 10, 'i2': 29}]

## starmap

In [7]:
## starmap
results = list()
if __name__ ==  '__main__':
    start=time.time()
    with mp.Pool(processes=numbers_2_use) as p:
        results = p.starmap(dummy_worker, [(x,) for x in args])     ## asi por que recibe una tupla por cada iteraccion
    t_starmap = time.time()-start
    print("T(starmap)=", t_starmap)
results

T(starmap)= 12.467171669006348


[{'a': 0, 'i': 3, 'i2': 3},
 {'a': 1, 'i': 8, 'i2': 9},
 {'a': 2, 'i': 1, 'i2': 3},
 {'a': 3, 'i': 5, 'i2': 8},
 {'a': 4, 'i': 6, 'i2': 10},
 {'a': 5, 'i': 7, 'i2': 12},
 {'a': 6, 'i': 5, 'i2': 11},
 {'a': 7, 'i': 10, 'i2': 17},
 {'a': 8, 'i': 5, 'i2': 13},
 {'a': 9, 'i': 8, 'i2': 17},
 {'a': 10, 'i': 6, 'i2': 16},
 {'a': 11, 'i': 9, 'i2': 20},
 {'a': 12, 'i': 1, 'i2': 13},
 {'a': 13, 'i': 10, 'i2': 23},
 {'a': 14, 'i': 9, 'i2': 23},
 {'a': 15, 'i': 3, 'i2': 18},
 {'a': 16, 'i': 5, 'i2': 21},
 {'a': 17, 'i': 7, 'i2': 24},
 {'a': 18, 'i': 1, 'i2': 19},
 {'a': 19, 'i': 3, 'i2': 22}]

## apply_async

In [8]:
# apply_async

results = list()
if __name__ ==  '__main__':
    start=time.time()
    with mp.Pool(processes=numbers_2_use) as p:
        jobs = [p.apply_async(dummy_worker, (x,)) for x in args]
        for j in jobs:
            results.append(j.get())
    t_apply_async = time.time()-start
    print("T(apply_async)=", t_apply_async)
results

T(apply_async)= 10.871059894561768


[{'a': 0, 'i': 3, 'i2': 3},
 {'a': 1, 'i': 10, 'i2': 11},
 {'a': 2, 'i': 1, 'i2': 3},
 {'a': 3, 'i': 8, 'i2': 11},
 {'a': 4, 'i': 1, 'i2': 5},
 {'a': 5, 'i': 2, 'i2': 7},
 {'a': 6, 'i': 6, 'i2': 12},
 {'a': 7, 'i': 4, 'i2': 11},
 {'a': 8, 'i': 2, 'i2': 10},
 {'a': 9, 'i': 8, 'i2': 17},
 {'a': 10, 'i': 10, 'i2': 20},
 {'a': 11, 'i': 7, 'i2': 18},
 {'a': 12, 'i': 5, 'i2': 17},
 {'a': 13, 'i': 4, 'i2': 17},
 {'a': 14, 'i': 5, 'i2': 19},
 {'a': 15, 'i': 10, 'i2': 25},
 {'a': 16, 'i': 4, 'i2': 20},
 {'a': 17, 'i': 9, 'i2': 26},
 {'a': 18, 'i': 7, 'i2': 25},
 {'a': 19, 'i': 9, 'i2': 28}]

In [9]:
# apply_async has error!

results = list()
if __name__ ==  '__main__':
    start=time.time()
    with mp.Pool(processes=numbers_2_use) as p:
        jobs = [(x, p.apply_async(dummy_worker_error, (x,))) for x in args]
        for params,j in jobs:
            try:
                response = j.get()
                results.append({"response": response, "error": None})
                # print(params, response)
            except Exception as e:
                results.append({"response": None, "error": str(e)})
                print(params, e)
    t_apply_async_error = time.time()-start
    print("T(apply_async+error)=", t_apply_async_error)
    
    results = dict((i, dict(zip(("params","response","error"),(arg, results[i]['response'], results[i]["error"])))) for i,arg in enumerate(args))
    
results

2 Dummy Error, value is 5
5 Dummy Error, value is 0
6 Dummy Error, value is 3
9 Dummy Error, value is 5
13 Dummy Error, value is 0
14 Dummy Error, value is 3
18 Dummy Error, value is 5
T(apply_async+error)= 7.848949670791626


{0: {'params': 0, 'response': {'a': 0, 'i': 9, 'i2': 9}, 'error': None},
 1: {'params': 1, 'response': {'a': 1, 'i': 2, 'i2': 3}, 'error': None},
 2: {'params': 2, 'response': None, 'error': 'Dummy Error, value is 5'},
 3: {'params': 3, 'response': {'a': 3, 'i': 4, 'i2': 7}, 'error': None},
 4: {'params': 4, 'response': {'a': 4, 'i': 7, 'i2': 11}, 'error': None},
 5: {'params': 5, 'response': None, 'error': 'Dummy Error, value is 0'},
 6: {'params': 6, 'response': None, 'error': 'Dummy Error, value is 3'},
 7: {'params': 7, 'response': {'a': 7, 'i': 1, 'i2': 8}, 'error': None},
 8: {'params': 8, 'response': {'a': 8, 'i': 2, 'i2': 10}, 'error': None},
 9: {'params': 9, 'response': None, 'error': 'Dummy Error, value is 5'},
 10: {'params': 10, 'response': {'a': 10, 'i': 7, 'i2': 17}, 'error': None},
 11: {'params': 11, 'response': {'a': 11, 'i': 2, 'i2': 13}, 'error': None},
 12: {'params': 12, 'response': {'a': 12, 'i': 10, 'i2': 22}, 'error': None},
 13: {'params': 13, 'response': None

# Pasando una funcion a MP

## with `*args`

In [10]:
import string

letters = string.ascii_lowercase
letters = list(letters)
len(letters)

26

In [11]:
args = [(i,a) for i,a in enumerate(letters)]
args

[(0, 'a'),
 (1, 'b'),
 (2, 'c'),
 (3, 'd'),
 (4, 'e'),
 (5, 'f'),
 (6, 'g'),
 (7, 'h'),
 (8, 'i'),
 (9, 'j'),
 (10, 'k'),
 (11, 'l'),
 (12, 'm'),
 (13, 'n'),
 (14, 'o'),
 (15, 'p'),
 (16, 'q'),
 (17, 'r'),
 (18, 's'),
 (19, 't'),
 (20, 'u'),
 (21, 'v'),
 (22, 'w'),
 (23, 'x'),
 (24, 'y'),
 (25, 'z')]

La funcion recibe una tupla con valores *args

In [12]:
def function_to_mp__args(numbers_of_cores:int, myfunc, args):
    # if type(args[0]) in ['int','float']:
    #     args = [(x,) for x in args]
    results = list()
    if __name__ ==  '__main__':
        start=time.time()
        with mp.Pool(processes=numbers_of_cores) as p:
            jobs = [(params, p.apply_async(myfunc, params)) for params in args]
            for params,j in jobs:
                try:
                    response = j.get()
                    results.append({"response": response, "error": None})
                    # print(params, response)
                except Exception as e:
                    results.append({"response": None, "error": str(e)})
                    print(params, e)
        t_apply_async_error_2 = time.time()-start
        print("T(apply_async+error)=", t_apply_async_error_2)
        
        results = dict((i, dict(zip(("params","response","error"),(arg, results[i]['response'], results[i]["error"])))) for i,arg in enumerate(args))
    return results

In [13]:
results = function_to_mp__args(numbers_2_use, dummy_worker_int_str, args)

(5, 'f') Dummy Error, value is 0
(6, 'g') Dummy Error, value is 0
(9, 'j') Dummy Error, value is 0
(23, 'x') Dummy Error, value is 0
T(apply_async+error)= 44.51657199859619


In [14]:
results

{0: {'params': (0, 'a'),
  'response': {'i': 9, 'arg1': 0, 'arg2': 'a'},
  'error': None},
 1: {'params': (1, 'b'),
  'response': {'i': 1, 'arg1': 1, 'arg2': 'b'},
  'error': None},
 2: {'params': (2, 'c'),
  'response': {'i': 10, 'arg1': 2, 'arg2': 'c'},
  'error': None},
 3: {'params': (3, 'd'),
  'response': {'i': 1, 'arg1': 3, 'arg2': 'd'},
  'error': None},
 4: {'params': (4, 'e'),
  'response': {'i': 9, 'arg1': 4, 'arg2': 'e'},
  'error': None},
 5: {'params': (5, 'f'), 'response': None, 'error': 'Dummy Error, value is 0'},
 6: {'params': (6, 'g'), 'response': None, 'error': 'Dummy Error, value is 0'},
 7: {'params': (7, 'h'),
  'response': {'i': 6, 'arg1': 7, 'arg2': 'h'},
  'error': None},
 8: {'params': (8, 'i'),
  'response': {'i': 9, 'arg1': 8, 'arg2': 'i'},
  'error': None},
 9: {'params': (9, 'j'), 'response': None, 'error': 'Dummy Error, value is 0'},
 10: {'params': (10, 'k'),
  'response': {'i': 2, 'arg1': 10, 'arg2': 'k'},
  'error': None},
 11: {'params': (11, 'l'),
 

## with `*kwargs`

La funcion recibe una tupla con valores *kwargs

In [15]:
kwargs = [{"arg1":i, "arg2":a} for i,a in enumerate(letters)]
kwargs

[{'arg1': 0, 'arg2': 'a'},
 {'arg1': 1, 'arg2': 'b'},
 {'arg1': 2, 'arg2': 'c'},
 {'arg1': 3, 'arg2': 'd'},
 {'arg1': 4, 'arg2': 'e'},
 {'arg1': 5, 'arg2': 'f'},
 {'arg1': 6, 'arg2': 'g'},
 {'arg1': 7, 'arg2': 'h'},
 {'arg1': 8, 'arg2': 'i'},
 {'arg1': 9, 'arg2': 'j'},
 {'arg1': 10, 'arg2': 'k'},
 {'arg1': 11, 'arg2': 'l'},
 {'arg1': 12, 'arg2': 'm'},
 {'arg1': 13, 'arg2': 'n'},
 {'arg1': 14, 'arg2': 'o'},
 {'arg1': 15, 'arg2': 'p'},
 {'arg1': 16, 'arg2': 'q'},
 {'arg1': 17, 'arg2': 'r'},
 {'arg1': 18, 'arg2': 's'},
 {'arg1': 19, 'arg2': 't'},
 {'arg1': 20, 'arg2': 'u'},
 {'arg1': 21, 'arg2': 'v'},
 {'arg1': 22, 'arg2': 'w'},
 {'arg1': 23, 'arg2': 'x'},
 {'arg1': 24, 'arg2': 'y'},
 {'arg1': 25, 'arg2': 'z'}]

In [16]:
def function_to_mp__kwargs(function, kwargs:list=None, numbers_of_cores:int=None, debug:bool=False):
    """
        Params:
            function: function to paralalellize
            kwarg: arguments in dict
            number_of_cores:int, default=mp.cpu_count
        Return:
            List of dicts
    """
    if numbers_of_cores==None: numbers_of_cores=mp.cpu_count()
    if debug==True: print("number of cores for parallelizing", numbers_of_cores)
    results = list()
    if __name__ ==  '__main__':
        start=time.time()
        with mp.Pool(processes=numbers_of_cores) as p:
            jobs = [(params, p.apply_async(function, (), params)) for params in kwargs]
            for params,j in jobs:
                try:
                    response = j.get()
                    results.append({"params": params, "response": response, "error": None})
                    # print(params, response)
                except Exception as e:
                    results.append({"params": params, "response": None, "error": str(e)})
                    print(params, e)
        results = [dict((k,v) for k,v in x.items() if v!=None) for x in results]
        t_mp__kwargs = time.time()-start
        print("T(mp__kwargs)=", t_mp__kwargs)
    return results

In [17]:
results = function_to_mp__kwargs(numbers_of_cores=numbers_2_use, function=dummy_worker_int_str, kwargs=kwargs)

{'arg1': 8, 'arg2': 'i'} Dummy Error, value is 0
{'arg1': 15, 'arg2': 'p'} Dummy Error, value is 5
{'arg1': 17, 'arg2': 'r'} Dummy Error, value is 0
{'arg1': 19, 'arg2': 't'} Dummy Error, value is 0
{'arg1': 20, 'arg2': 'u'} Dummy Error, value is 0
{'arg1': 25, 'arg2': 'z'} Dummy Error, value is 3
T(mp__kwargs)= 51.54672288894653


In [18]:
import pandas as pd

In [19]:
pd.DataFrame(results)

Unnamed: 0,params,response,error
0,"{'arg1': 0, 'arg2': 'a'}","{'i': 4, 'arg1': 0, 'arg2': 'a'}",
1,"{'arg1': 1, 'arg2': 'b'}","{'i': 9, 'arg1': 1, 'arg2': 'b'}",
2,"{'arg1': 2, 'arg2': 'c'}","{'i': 8, 'arg1': 2, 'arg2': 'c'}",
3,"{'arg1': 3, 'arg2': 'd'}","{'i': 1, 'arg1': 3, 'arg2': 'd'}",
4,"{'arg1': 4, 'arg2': 'e'}","{'i': 9, 'arg1': 4, 'arg2': 'e'}",
5,"{'arg1': 5, 'arg2': 'f'}","{'i': 7, 'arg1': 5, 'arg2': 'f'}",
6,"{'arg1': 6, 'arg2': 'g'}","{'i': 7, 'arg1': 6, 'arg2': 'g'}",
7,"{'arg1': 7, 'arg2': 'h'}","{'i': 6, 'arg1': 7, 'arg2': 'h'}",
8,"{'arg1': 8, 'arg2': 'i'}",,"Dummy Error, value is 0"
9,"{'arg1': 9, 'arg2': 'j'}","{'i': 8, 'arg1': 9, 'arg2': 'j'}",
