<a href="https://colab.research.google.com/github/AlvaroMartinez87/creating-a-pipeline-in-blue-ocean/blob/master/Paralelizacion_Python_AlvaroMartinez.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PARALELIZACIÓN EN PYTHON

## Algunas diferencias entre Python y C/C++

**Python** | **C/C++**
----------------- | ---------------------------
**Interpretado** | **Compilado**
**Multiplataforma** | La compilación crea **código específico** para cada plataforma
**Flexiblidad**: listas que pueden contener listas, <br> diccionarios, funciones (y cualquier tipo de dato) | **Velocidad**: la compatibilidad en las operaciones <br> se resuelve en tiempo de compilación
**Tipado dinámico** | **Tipado estático**: es necesario declarar el tipo de las variables
Orientado a un **desarrollo rápido** | Orientado a una **ejecución rápida**

## Algunas opciones para acelerar programas Python

* Paralelizando con hilos: threading 
* Paralelizando con procesos: multiprocessing
* Cython
* pyCUDA
* Numba

## PARALALIZANDO CON HILOS: `threading`

El módulo `threading` permite crear hilos:

https://docs.python.org/3/library/threading.html


Veamos con un ejemplo cómo podemos podemos descomponer un programa en hilos.

Importamos la clase `time` para medir los tiempos de ejecución y comprobar el rendimiento del programa.

In [1]:
import time

Ejecutemos primero un programa que cuenta un número de veces muy grande, por ejemplo 100.000.000 de veces, y veamos el tiempo que tarda.

In [2]:
import time
VALOR = 500000000

def cuentra_atras(n):
    while n > 0:
        n -= 1

inicio = time.time()
cuentra_atras(VALOR)
fin = time.time()

print('Tiempo (en segundos)', fin - inicio)

Tiempo (en segundos) 30.002902507781982


Intentemos acelerar el programa creando tres hilos y repartiendo la cuenta entre los tres hilos. Cada hilo contará la tercera parte del número total de veces.

In [3]:
# import time
from threading import Thread

VALOR = 100000000

def cuentra_atras(n):
    while n > 0:
        n -= 1

t1 = Thread(target=cuentra_atras, args=(VALOR//3,))
t2 = Thread(target=cuentra_atras, args=(VALOR//3,))
t3 = Thread(target=cuentra_atras, args=(VALOR//3,))

inicio = time.time()
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
fin = time.time()

print('Tiempo (en segundos)', fin - inicio)

Tiempo (en segundos) 6.105731725692749


Podemos comprobar que ambas versiones tardan un tiempo parecido, incluso puede que la versión con hilos tarde un poco más. Esto se debe al **GIL: Global Interpreter Lock**.

## ¿Qué es el GIL (Global Interpreter Lock) de Python?

El [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) es un mutex que permite que sólo un hilo tome el control del intérprete de Python, es decir, que solo un hilo puede estar en ejecución a la vez. el GIL impide que los hilos se ejecuten en paralelo, y reparte el tiempo de CPU entre los hilos. **Se ejecutan concurrentemente pero no paralelamente**.

![GIL]( https://drive.google.com/uc?id=1UFwYUV1ZRgzlqJxKtXH3O8AlQ8ipWo4r)

### El conteo de referencias

Python usa el **conteo de referencias** para la gestión de memoria. Cada objeto creado en Python tienen un contador de referencias que registra el número de referencias activas que apuntan a ese objeto. Cuando este contador llega a 0, el recolector de basura libera la memoria asignada a ese objeto porque entiende que ya no se está utilizando. **GIL impide que los hilos modifiquen este contador de referencias mientras otros hilos estén haciendo uso del objeto**. Si se tiene varios hilos ejecutando  concurrentemente, podría ocurrir que un hilo le indique al intérprete que un objeto se ha dejado de utilizar, cuando realmente otro hilo aun sigue trabajando con él. 

Para evitar este problema se implementó GIL, limitando que sólo un hilo tome el control del intérprete de Python.

Veamos con un ejemplo el conteo de referencias. En este caso obtenemos como resultado 3, ya que la variable A cuenta con tres referencias: en las dos asignaciones y en la llamada a la función.

In [12]:
# import time
import sys

A = []
B = A
sys.getrefcount(A)

inicio = time.time()
fin = time.time()

print('Tiempo (en segundos)', fin - inicio)

Tiempo (en segundos) 2.002716064453125e-05



GIL favorece a los programas de un único hilo, pero perjudica a los programas multihilo con mucha carga de CPU. **El uso de hilos con `threading` es más adecuado para ejecutar simultáneamente varias tareas con carga de I/O.**

Python tiene múltiples implementaciones de intérpretes. Por ejemplo, CPython, Jython, IronPython o PyPy. **GIL existe únicamente en la implementación original de Python que es CPython**.

## PARALALIZANDO CON PROCESOS: `multiprocessing`

El módulo `multiprocessing` permite crear procesos:

https://docs.python.org/3/library/multiprocessing.html



## Cómo evitar el GIL

Si queremos que las tareas se ejecuten de forma paralela debemos usar **multiples procesos en lugar de múltiples hilos**. De esta forma cada proceso ejecutará su propio intérprete Python y tendrá su propio espacio de memoria, logrando así evitar el cuello de botella del GIL.

### El módulo multiprocessing
https://docs.python.org/3/library/multiprocessing.html

Hay 2 clases principales en el módulo multiprocessing para implementar la ejecución paralela de una función: la clase Process y la clase Pool.

1. Clase Process

2. Clase Pool 
   1. Ejecución síncrona
      * Pool.map() y Pool.starmap()
      * Pool.apply()
   2. Ejecución asíncrona
      * Pool.map_async() y Pool.starmap_async()
      * Pool.apply_async())

Para ilustrar cómo se puede paralelizar en Python vamos a crear un función que cuente cuántas veces aparece en cada fila de una matriz bidimensional un número entero dado.

Definimos primero un matriz de enteros aleatorios. 

In [22]:
import numpy as np
from time import time

# Preparamos los datos. Matriz de 1000x10
np.random.RandomState(80)
array = np.random.randint(0, 10, size=[1000, 10])
datos = array.tolist()

# Mostramos las 5 primeras filas
datos[:10]

[[7, 7, 8, 5, 4, 0, 0, 9, 2, 3],
 [0, 5, 9, 0, 4, 4, 6, 7, 8, 5],
 [0, 3, 1, 5, 9, 8, 2, 4, 1, 6],
 [4, 3, 6, 9, 4, 3, 9, 0, 6, 9],
 [2, 4, 1, 3, 9, 2, 1, 4, 6, 5],
 [8, 5, 7, 6, 2, 4, 4, 9, 8, 4],
 [0, 4, 9, 8, 0, 3, 6, 5, 1, 5],
 [5, 8, 1, 1, 3, 2, 3, 9, 9, 8],
 [6, 6, 0, 9, 2, 3, 8, 7, 0, 2],
 [6, 2, 6, 6, 0, 5, 5, 3, 4, 1]]

## Implementación sin paralelizar

Definimos la función `n_veces` que cuenta las veces que aparece un número dado en una fila. Luego iteramos la función sobre las filas de la matriz.

In [23]:
import time
# Implementación sin paralelizar
inicio = time.time()
def n_veces(fila, n):
    """Devuelve cuántas veces aparece `n` en una `fila` dada"""
    cont = 0
    for i in fila:
        if i == n :
            cont = cont + 1
    return cont

#Contamos con n=4
resultado = []
for fila in datos:
    resultado.append(n_veces(fila, n=4))

#Muestra cuántas veces está el número `4` en las 10 primeras filas                   
print(resultado[:10])

fin = time.time()

print('Tiempo (en segundos)', fin - inicio)

[1, 2, 1, 2, 2, 3, 1, 0, 0, 1]
Tiempo (en segundos) 0.0026564598083496094


## Paralelización con la clase `Process`

Con la clase `Process` podemos crear nuevos procesos. Para la transferencia de datos entre los procesos se pueden usar colas (`Queues`) o tuberías (`Pipes`). En este ejemplo se usa una cola. Es necesario modificar la función `n_veces` para añadir la cola como parámetro.

In [24]:
import time
# Paralelización con la clase Process

from multiprocessing import Process, Queue
inicio = time.time()
def n_veces_q(q, fila, n):
    """Devuelve cuántas veces aparece `n` en una `fila` dada"""
    cont = 0
    for i in fila:
        if i == n :
            cont = cont + 1
    q.put(cont) 

q = Queue()
resultado = []
for fila in datos:
    p = Process(target=n_veces_q, args=(q, fila, 4,))
    p.start()
    resultado.append(q.get())
    
p.join()

print(resultado[:10])

fin = time.time()

print('Tiempo (en segundos)', fin - inicio)

[1, 2, 1, 2, 2, 3, 1, 0, 0, 1]
Tiempo (en segundos) 10.456415891647339


## Paralelización con la clase `Pool`

La forma general de paralelizar es iniciar un `Pool` con `n` procesos y pasar la función con alguno de los métodos de paralelización.

El método `cpu_count` nos devuelve el número de *cores*.

`multiprocessing.Pool()` proporciona los métodos `apply()`, `map()` y `starmap()` para que una función se ejecute en paralelo. La diferencia entre `apply()` y `map()` es que `apply()` tiene el argumento *args* que acepta los parámetros pasados a la función a paralelizar como un argumento, mientras que `map()` admite únicamente un iterable como argumento.

### Paralelización mediante `Pool.apply()`

In [13]:
import time

# Paralelización mediante Pool.apply()

import multiprocessing as mp
inicio = time.time()
# Paso 1: Iniciar multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

# Paso 2: `pool.apply` a `n_veces`
resultado = [pool.apply(n_veces, args=(fila, 4)) for fila in datos]

# Paso 3: Cerrar
pool.close()    

print(resultado[:10])

fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[0, 1, 1, 1, 2, 1, 2, 0, 1, 0]
Tiempo (en segundos) 0.1506049633026123


### Paralelización mediante `Pool.map()`
`Pool.map()` acepta un único iterable como argumento. Vamos a modificar la función `n_veces` asignando un valor por defecto a *n*. Así, la nueva función sólo acepta una lista iterable de filas como entrada.

In [14]:
import time
#  Paralelización mediante Pool.map()

inicio = time.time()
# Redefinir la función con un único argumento requerido.
def n_veces_un_arg(fila, n=4):
    cont = 0
    for i in fila:
        if i == n :
            cont = cont + 1
    return cont

pool = mp.Pool(mp.cpu_count())

resultado = pool.map(n_veces_un_arg, [fila for fila in datos])

pool.close()

print(resultado[:10])

fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[0, 1, 1, 1, 2, 1, 2, 0, 1, 0]
Tiempo (en segundos) 0.025881052017211914


### Paralelización mediante `Pool.starmap()`

En el ejemplo anterior hemos tenido que redefinir la función para que tuviera un único argumento requerido. En `Pool.starmap()`, cada elemento de su argumento iterable es también un iterable. Podemos pasarle los argumentos a la función a paralelizar en el mismo orden en este elemento iterable interno. De esta manera evitamos tener que modificar la función.

In [15]:
import time
# Parallelizing with Pool.starmap()
import multiprocessing as mp
inicio = time.time()
pool = mp.Pool(mp.cpu_count())

resultado = pool.starmap(n_veces, [(fila, 4) for fila in datos])

pool.close()

print(resultado[:10])
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[0, 1, 1, 1, 2, 1, 2, 0, 1, 0]
Tiempo (en segundos) 0.03166842460632324


## Paralelización asíncrona

Los métodos equivalentes asíncronos `apply_async()`, `map_async()` y `starmap_async()` permiten ejecutar los procesos paralelos de forma asíncrona, En este caso, no se puede garantizar en qué orden se obtendrán los resultados.

### Paralelización mediante `Pool.apply_async()`
`apply_async()` es muy similar a `apply()` excepto que hay que proporcionar una función de retrollamada (*callback*) que avisa cuando los resultados estén disponibles.

Como los procesos no tienen por qué terminar en el mismo orden en el que se iniciaron, vamos a redefir la función `n_veces2` con un parámetro para controlar el número de iteración *j* y poder ordenar los resultados finales.

In [28]:
# Paralelización mediante Pool.apply_async()
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

resultados = []
inicio = time.time()
# Redefinir para incluir el número de iteración `j`, 
def n_veces2(j, fila, n):
    """Devuelve cuántas veces aparece `n` en una `fila` dada"""
    cont = 0
    for i in fila:
        if i == n :
            cont = cont + 1
    return (j,cont)

# Definir la función callback para recoger los `resultados`
def recoger_resultados(res):
    global resultados
    resultados.append(res)

# Usar un bucle para paralelizar
for i, fila in enumerate(datos):
    pool.apply_async(n_veces2, args=(i, fila, 4), callback=recoger_resultados)

# Cerrar el Pool y permitir que terminen todos los procesos    
pool.close()
pool.join()  # garantiza que la siguiente línea se ejecute después de que terminen todos los procesos

# Ordenar resultados (opcional)
#resultados.sort(key=lambda x: x[0])

resultados_finales = [r for i, r in resultados]

print(resultados_finales[:10])
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[1, 2, 1, 2, 2, 3, 1, 0, 0, 1]
Tiempo (en segundos) 0.10982918739318848


También se puede implementar sin utilizar una función *callback*. En este caso, es necesario usar el método `pool.ApplyResult.get()` para recoger el resultado final deseado

In [None]:
# Paralelización mediante Pool.apply_async() sin función callback

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

resultados = []

# Llamar a apply_async() sin callback
lista_res = [pool.apply_async(n_veces2, args=(i, fila, 4)) for i, fila in enumerate(datos)]

# lista_res es una lista de objetos pool.ApplyResult
resultados = [r.get()[1] for r in lista_res]

# Cerrar el Pool y permitir que terminen todos los procesos
pool.close()
pool.join() # garantiza que la siguiente línea se ejecute después de que terminen todos los procesos

print(resultados[:10])

### Paralelización mediante `Pool.map_async()`

Versión asíncrona de `Pool.map()`

In [30]:
# Paralelización mediante Pool.map_async()

import multiprocessing as mp
inicio = time.time()
# Redefinir la función con un único argumento requerido.
def n_veces_un_arg(fila, n=4):
    cont = 0
    for i in fila:
        if i == n :
            cont = cont + 1
    return cont

pool = mp.Pool(mp.cpu_count())

resultados = []

# Com map, hay que usar `n_veces_un_arg`
resultados = pool.map_async(n_veces_un_arg, [fila for fila in datos]).get()

pool.close()

print(resultados[:10])
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[1, 2, 1, 2, 2, 3, 1, 0, 0, 1]
Tiempo (en segundos) 0.02950453758239746


### Paralelización mediante Pool.starmap_async()

Versión asíncrona de `Pool.starmap()`

In [31]:
# Paralelización mediantePool.starmap_async()

import multiprocessing as mp
inicio = time.time()
pool = mp.Pool(mp.cpu_count())

resultados = []

resultados = pool.starmap_async(n_veces2, [(i, fila, 4) for i, fila in enumerate(datos)]).get()

pool.close()

resultados_finales = [r for i, r in resultados]
print(resultados_finales[:10])
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[1, 2, 1, 2, 2, 3, 1, 0, 0, 1]
Tiempo (en segundos) 0.03327751159667969


# Ejemplo: Paralelizar un DataFrame de Pandas
Los DataFrame de Pandas y los arrays de Numpy son los objetos más usados para almacenar datos tabulados en Ciencia de Datos.

Cuando queramos paralelizar un DataFrame, podemos crear una función paralelizable que tenga como parámetro de entrada:

* una fila del dataframe
* una columna del dataframe
* el dataframe entero

Para los dos primeros casos, se puede usar el módulo **multiprocessing**.  Pero, para paralelizar el dataframe entero vamos a usar el paquete **pathos**.

Creamos primero un dataframe de tamaño 5x2 con valores entre 0 y 9.

In [32]:
import numpy as np
import pandas as pd
import multiprocessing as mp

df = pd.DataFrame(np.random.randint(1, 10, size=[5, 2]))
print(df.head())

   0  1
0  5  3
1  2  9
2  3  2
3  4  8
4  5  3


## Paralelizando un dataframe por filas

Vamos a aplicar la función *hipotenusa* a cada fila, ejecutando sobre 4 procesos en paralelo. Usaremos `df.itertuples(name=None)` para pasar cada fila del dataframe como una tupla a la función hipotenusa

In [34]:
# Operación sobre las filas

import multiprocessing as mp
inicio = time.time()
def hipotenusa(fila):
    return round(fila[1]**2 + fila[2]**2, 2)**0.5

cores=mp.cpu_count()

with mp.Pool(cores) as pool:
    resultado = pool.imap(hipotenusa, df.itertuples(name=None), chunksize=10)
    salida = [round(x, 2) for x in resultado]
    
print(salida) 
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[5.83, 9.22, 3.61, 8.94, 5.83]
Tiempo (en segundos) 0.13171839714050293


## Paralelizando un dataframe por columnas
En este ejemplo, vamos a aplicar la función `suma_de_cuadrados` a cada columna, ejecutando sobre 2 procesos en paralelo. Usaremos el método `df.iteritems()` para pasar cada columna del dataframe a la función hipotenusa

In [36]:
# Operación sobre columnas
inicio = time.time()
def suma_de_cuadrados(columna):
    return sum([i**2 for i in columna[1]])

with mp.Pool(2) as pool:
    resultado = pool.imap(suma_de_cuadrados, df.iteritems(), chunksize=10)
    salida = [x for x in resultado]

print(salida)
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

[79, 167]
Tiempo (en segundos) 0.13555479049682617


## Paralelizando un dataframe entero
En este ejemplo se procesa un DataFrame entero usando el paquete **pathos**. Pathos sigue el patrón de multiprocesamiento:
Pool > Map > Close > Join > Clear

NOTA: Es necesario installar antes el paquete **pathos**: 

In [37]:
!pip install pathos

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pathos
  Downloading pathos-0.2.9-py3-none-any.whl (76 kB)
[K     |████████████████████████████████| 76 kB 3.4 MB/s 
Collecting ppft>=1.7.6.5
  Downloading ppft-1.7.6.5-py2.py3-none-any.whl (52 kB)
[K     |████████████████████████████████| 52 kB 1.6 MB/s 
[?25hCollecting multiprocess>=0.70.13
  Downloading multiprocess-0.70.13-py37-none-any.whl (115 kB)
[K     |████████████████████████████████| 115 kB 30.4 MB/s 
[?25hCollecting pox>=0.3.1
  Downloading pox-0.3.1-py2.py3-none-any.whl (28 kB)
Installing collected packages: ppft, pox, multiprocess, pathos
  Attempting uninstall: multiprocess
    Found existing installation: multiprocess 0.70.12.2
    Uninstalling multiprocess-0.70.12.2:
      Successfully uninstalled multiprocess-0.70.12.2
Successfully installed multiprocess-0.70.13 pathos-0.2.9 pox-0.3.1 ppft-1.7.6.5


In [39]:
import numpy as np
import pandas as pd
import multiprocessing as mp
from pathos.multiprocessing import ProcessingPool as Pool
fin = time.time()
# crear un dataframe 
df = pd.DataFrame(np.random.randint(1, 10, size=[1000, 2]))

# mostrar los primeros elementos del dataframe
print(df.head())

# Definir una función que devuelva las dimensiones de un dataframe
def func(df):
    return df.shape

# obtener el número de cores
cores=mp.cpu_count()

# dividir el DataFrame según el número de cores
df_split = np.array_split(df, cores, axis=0)

# Crear un pool de procesos
pool = Pool(cores)

# procesar el DataFrame mapeando la función func a cada df_split en el pool
df_salida = np.vstack(pool.map(func, df_split))

# cerrar el pool, join y clear
pool.close()
pool.join()
pool.clear()

print(df_salida)
fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

   0  1
0  9  8
1  4  7
2  7  2
3  5  7
4  2  2
[[500   2]
 [500   2]]
Tiempo (en segundos) 184.13018798828125


# Ejercicios

1. Vuelve a ejecutar los ejemplos anteriores con distintos tamaños de la matriz *datos* y comprobando los tiempos de ejecución.
2. Dada la siguiente función de normalización, usa `pool.apply()` para normalizar, en cada fila, los valores de la matriz *datos* para que varíen entre 0 y 1.

In [41]:
def normalizar(lista):
    mini = min(lista)
    maxi = max(lista)
    return [(i - mini)/(maxi-mini) for i in lista]

In [108]:
import numpy as np
from time import time

# Preparamos los datos. Matriz de 1000x10
np.random.RandomState(20)
array = np.random.randint(0, 10, size=[1000, 10])
datos = array.tolist()

# Mostramos las 5 primeras filas
datos[:10]

[[9, 0, 4, 1, 9, 1, 6, 3, 5, 5],
 [3, 8, 8, 9, 6, 2, 6, 6, 8, 6],
 [6, 8, 0, 7, 4, 6, 1, 8, 4, 0],
 [9, 2, 4, 6, 3, 6, 4, 1, 5, 4],
 [5, 8, 0, 1, 6, 3, 7, 9, 3, 4],
 [9, 1, 3, 5, 9, 8, 4, 5, 9, 5],
 [8, 5, 5, 6, 2, 6, 9, 4, 1, 3],
 [4, 5, 2, 4, 4, 8, 5, 3, 3, 5],
 [6, 1, 6, 6, 6, 8, 3, 5, 9, 0],
 [2, 6, 9, 7, 6, 7, 8, 8, 1, 9]]

In [110]:
import time
import numpy as np
# Implementación sin paralelizar

inicio = time.time()
max_element = np.max(datos)
min_element = np.min(datos)

def normalizar(datos):
    return [(i - min_element)/(max_element-min_element) for i in datos]

resultado = []
for fila in datos:
    resultado.append(normalizar(datos))
#print(resultado[:10])

fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

Tiempo (en segundos) 6.735282897949219


In [115]:
import time
import numpy as np

# Paralelización mediante Pool.apply()
import multiprocessing as mp

inicio = time.time()
max_element = np.max(datos)
min_element = np.min(datos)

def normalizar(datos):
    return [(i - min_element)/(max_element-min_element) for i in datos]

# Paso 1: Iniciar multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

# Paso 2: `pool.apply` a `n_veces`
resultado = []
resultado = [pool.apply(normalizar, args=(datos)) for fila in datos]

# Paso 3: Cerrar
pool.close()    

#print(resultado[:10])

fin = time.time()
print('Tiempo (en segundos)', fin - inicio)

TypeError: ignored