# HERRAMIENTAS PARA PARALELIZAR

### ¿Cuántos núcleos tengo?

In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  4


### HERRAMIENTAS

**map (función,iterable)** La función map () devuelve un elemento iterable (lista, tupla, etc) de los resultados después de aplicar una función dada a cada posición de otro elemento iterable.

In [2]:
# Python program to demonstrate working of map. 
  
# Return double of n 
def addition(n): # FUNCIÓN
    return n + n 
  
# We double all numbers using map() 
numbers = (1, 2, 3, 4) # ITERABLE
result = map(addition, numbers) 
print(list(result)) 

[2, 4, 6, 8]


**PAQUETE MULTIPROCESSING** 

Dento de este paquete tenemos dos opciones https://docs.python.org/2/library/multiprocessing.html :

* Pool
* Process (dentro de process hay funciones para extraer e intercambiar información entre procesos)

Pool y Process ejecutan la tarea en paralelo, pero su forma de ejecutar la tarea en paralelo es diferente.

**Pool** distribuye las tareas a los procesadores disponibles mediante una programación FIFO (Primero en entrar, primero en salir). Funciona como la arquitectura 'map reduce'. Asigna la entrada a los diferentes procesadores y recopila la salida de todos los procesadores. Después de la ejecución del código, devuelve el resultado en forma de una lista o matriz. Es decir, espera a que finalicen todas las tareas y luego devuelve la salida. Los procesos en ejecución se almacenan en la memoria y otros procesos que no se ejecutan se almacenan fuera de la memoria.

**Process** coloca todos los procesos en la memoria y programa su ejecución utilizando la programación FIFO.

Elegir el enfoque apropiado depende de la tarea en cuestión. **Pool** le permite realizar múltiples tareas por proceso, lo que puede facilitar la paralelización de su programa. Si tiene un millón de tareas para ejecutar en paralelo, puede crear un Pool con un número de procesos tantos como núcleos de CPU y luego pasar la lista de millones de tareas a pool.map. El pool distribuirá esas tareas y recopilará los valores de retorno en forma de lista. 
Por otro lado, si tiene una pequeña cantidad de tareas para ejecutar en paralelo, y solo necesita que cada tarea se realice una vez, es razonable usar **process**, ya que procesa por separado para cada tarea, en lugar de configurar un Pool.

*Conclusión* : para tareas grandes Pool y para las pequeñas Process (hay que paralelizar con cuidado para no dañar el sistema operativo). **Yo diría de usar Pool que por lo que veo se usa más** 

POOL

In [3]:
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(processes=4) 
    print(p.map(f, [1, 2, 3]))

[1, 4, 9]


Dentro de Pool tenemos tres opciones:
* **.map()**: map solo puede tomar un iterable (lista,tupla,etc) como argumento. Es más adecuado para operaciones iterables más simples pero hace el trabajo más rápido.
* **.apply()**: toma un argumento que acepta los parámetros pasados a la función que queremos paralelizar como argumento
* **.starmap()**: es como una versión de Pool.map () que acepta argumentos.

**PAQUETE NUMBA**

Si el código está orientado numéricamente (hace muchos cálculos), usa mucho NumPy y/o tiene muchos bucles, entonces Numba suele ser una buena opción. JIT 'just in time' (@jit) es el compilador de Numba para funciones y código de python.

Cosas a tener en cuenta al usar Numba:
* A numba le gustan los bucles
* A numba le gustan las funciones de NumPy
* A numba le gusta la transmisión entre funciones de Numpy

* **Numba no entiende Pandas**

Tenga en cuenta que Pandas no es entendido por Numba y, como resultado, Numba simplemente ejecutaría este código a través del intérprete (mayor gasto de los recursos de Numba por lo que la función no se va a beneficiar de las ventajas de Numba). **Asi que creo que no se puede usar con Pandas y si se pudiera no merece la pena**

Una de las formas de hacer más rápido el código es vectorizando o intentando quitar bucles.


**CYTHON** (NO ES PARALELIZAR, ES OPTIMIZAR)

Hay que instalarse la extension

La forma más fácil de aumentar la velocidad de una función escrita en Python es volver a escribir esta en C y compilarla. Pero C no es tan fácil como el de Python. Para esto es lo que existe Cython, una librería que, con pequeñas modificaciones, permite traducir código Python a C y compilarlo. Cython sirve para optimizar un código de python que usa **Pandas**

A continuación, un ejemplo de funciones de python aplicadas sobre un dataframe y aplicadas utilizando cython para optimizarlas.

Sin la extensión

In [30]:
import pandas as pd
import numpy as np
df = pd.DataFrame({'a': np.random.randn(1000),
                    'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                    'x': 'x'})

In [31]:
df

Unnamed: 0,a,b,N,x
0,-1.803415,-0.263468,697,x
1,-1.154079,-1.075057,823,x
2,-0.276002,1.844990,283,x
3,0.532011,-0.342453,974,x
4,2.013967,-1.346883,648,x
...,...,...,...,...
995,-0.824793,-1.079939,174,x
996,0.659160,-2.409429,358,x
997,0.678011,-0.536773,698,x
998,0.796962,-0.626921,856,x


In [32]:
def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

Tiempo que tarda:

In [33]:
%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N']), axis=1)

231 ms ± 17.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Con la extensión **Cython**

In [35]:
%load_ext Cython

The Cython extension is already loaded. To reload it, use:
  %reload_ext Cython


In [36]:
 %%cython
def f_plain(x):
    return x * (x - 1)
def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

In [37]:
%timeit df.apply(lambda x: integrate_f_plain(x['a'], x['b'], x['N']), axis=1)

141 ms ± 38.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


# EJEMPLO NUESTRO CÓDIGO 

Con Pool.map() no se como hacerlo porque es para listas iterables y operaciones sencillas y no es nuestro caso. 

Con Pool.apply() creo que **¡¡si funciona!!**

In [19]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import statistics as stats
from sklearn.model_selection import GridSearchCV
from random import sample
from sklearn.metrics import classification_report
from multiprocessing import Pool
import time

In [20]:
pacientes_sueño=[]
for file in (os.listdir("labels")):
    pacientes_sueño.append(os.path.join("labels",file))
    pacientes_sueño=sorted(pacientes_sueño)
    
pacientes_frec_card=[]
for file in os.listdir("heart_rate"):
    pacientes_frec_card.append(os.path.join("heart_rate",file))

pacientes_pasos=[]
for file in os.listdir("steps"):
    pacientes_pasos.append(os.path.join("steps",file))

pacientes_movimiento=[]
for file in os.listdir("motion"):
    pacientes_movimiento.append(os.path.join("motion",file))

In [21]:
n=6
#SUEÑO
sueño=pd.read_csv(pacientes_sueño[n], delimiter=' ')
sueño.columns=["Tiempo","Etiqueta"]
n_sueño=len(sueño) # muestras en etiquetas del sueño

#FRECUENCIA CARDIACA
frec_card=pd.read_csv(pacientes_frec_card[n])
frec_card.columns=["Tiempo","Frec card"]
frec_card=frec_card[frec_card["Tiempo"]>0] # frecuencia cardiaca una noche

In [22]:
def get_window(w_size,signal):
    time = signal["Tiempo"]
    f_list=[]
    
    for i in range(n_sueño):
        a = w_size*(1+i)
        b = w_size*(2+i)

        idx=[]
        for t in time:
            if t>a and t<b:
                idx.append(True)
            else:
                idx.append(False)
        
        f = signal[idx]
        f_list.append(f)
        
    return f_list

**SIN PARALELIZAR**

In [28]:
w_size=30
signal=frec_card

start_time = time.time()

f_list= get_window(w_size,signal)

mean_list=[]
min_list=[]
max_list=[]

for f in f_list:
    mean_list.append(round(f['Frec card'].mean(),2))
    min_list.append(round(f['Frec card'].min(),2))
    max_list.append(round(f['Frec card'].max(),2))
    
feat1=min_list
feat2=max_list
feat3=mean_list

datos = {'Min_hr': feat1, "Max_hr": feat2,"Mean_hr":feat3}
datos_paciente = pd.DataFrame(data=datos)

print ("My program took", time.time() - start_time, "to run")

My program took 4.648175001144409 to run


**PARALELIZANDO**

In [42]:
w_size=30
signal=frec_card

start_time = time.time()

if __name__ == '__main__':
    p = Pool(processes=4) 
    f_list=p.apply(get_window,(w_size,signal))
    #print(f_list)

mean_list=[]
min_list=[]
max_list=[]

for f in f_list:
    mean_list.append(round(f['Frec card'].mean(),2))
    min_list.append(round(f['Frec card'].min(),2))
    max_list.append(round(f['Frec card'].max(),2))

feat1=min_list
feat2=max_list
feat3=mean_list

datos = {'Min_hr': feat1, "Max_hr": feat2,"Mean_hr":feat3}
datos_paciente = pd.DataFrame(data=datos)

print ("My program took", time.time() - start_time, "to run")

My program took 5.398766994476318 to run


**Cosas que mejorar**

* Revisar para optimizar dataframe https://towardsdatascience.com/make-your-own-super-pandas-using-multiproc-1c04f41944a1
* Probar en una función mas grande que tarde más para ver si realmente paraleliza porque en el tiempo no podemos verlo tal y como esta ahora (parece que tarda mas pero no deberia)
* Buscar otra forma de medir el tiempo
* Probar con Cython 