# Python para el análisis de datos -  UNAV
---

# Paralelización

## Paralelización en procesado de datos

El consumo de grandes cantidades de datos hace que cada vez surja más información sobre cómo paralelizar tareas en el ámbito de Data Science.

Dentro de toda esta información, existen dos grandes bloques:

- **Multiprocessing**: paralelización cuando tienes varios procesadores y por lo tanto no comparten muchos de los recursos entre ellos. Suele utilizarse cuando hay un uso intensivo de CPU y no hay IO o interacción con el usuario. Quieres lanzar procesos paralelos y te da importan las colisiones que pueda ver, pero no tienen recursos comunes. No tienen cosas en comun. 

- **Threading**: son hilos que corren dentro de los mismos procesadores y que, por tanto, comparten recursos. Suele ser utilizado con aplicaciones con interacción con usuarios pero, en Data Science, con web scrapping. Por ejemplo, Tensor Flow lo utiliza nativamente. Te da igual las colisiones que pueda a ver por acceder a los mismos recursos de memoria. Puedes tener varaibles iguales. 

Es más difícil encontrar casos de uso del multiprocessing sobre todo si estás trabajando en la nube, por tanto, nosotros sólo analizaremos la creación de hilos. Pero todas las técnicas son bastante análogas.

## Módulo thread/threading

En python tenemos dos módulos que nos permiten tratar con hilos.

El módulo thread (_thread en python 3) es el de más bajo nivel y tiene algunas funciones que nos permiten trabajar con hilos, generalmente no hay necesidad de hacer uso de él. 

El módulo threading es de más alto nivel y tiene la clase Thread, que representa un hilo. Veremos ejemplos sólo, por eso, de este módulo, los ejemplos sobre el otro son análogos.

Podéis encontrar otros módulos en python por similares propósitos.

## Arrancando un hilo

Un módulo de alto nivel que nos permite hacer hilos en python es threading. Este módulo, entre otras cosas, tiene una clase Thread, que es la que representa el hilo. Para hacer un hilo, debemos heredar de ella y definir el método run(). Lo que pongamos en ese método se ejecutará en un hilo aparte. Para arrancar el hilo, debemos instanciar la clase hija de Thread que hayamos hecho y llamar a su método start(). Los pasos básicos con estos

In [1]:
from threading import Thread

class MiHilo(Thread):
    def run(self):
        print("pasando")

# Arranque del hilo
hilo = MiHilo()
hilo.start()


pasando


A la clase también le podemos poner parámetros, en el constructor, como a cualquier otra:

In [2]:
class MiHilo(Thread):
    def __init__(self, texto):
        Thread.__init__(self)
        self.texto= texto
    def run(self):
        print("pasando " + self.texto)

# Arranque del hilo
hilo = MiHilo("ahora")
hilo.start()

pasando ahora


Hay que aclarar que el "main" podrá acabar pero el programa seguirá vivo hasta que todos los hilos terminen

## Ejemplo para ver ejecución

Este ejemplo crea un número de hilos por tarea y va escribiendo cuando pasa por ellas, se puede ver que las tareas aparecen desordenadas, por tanto se están lanzando en paralelo, AVISO: NUNCA PARARÁN SI NO LAS MATAS

In [None]:
import threading
import random
import time

# how many threads we want to start
THREADS = { 'task1':2 , 'task2':3, 'task3':2}

def task1(threadName):
    while True:
        print("I am %s and I execute task1" % threadName)
        time.sleep(random.randint(1, 10))

def task2(threadName):
    while True:
        print("I am %s and I execute task2" % threadName)
        time.sleep(random.randint(1, 10))

def task3(threadName):
    while True:
        print("I am %s and I execute task3" % threadName)
        time.sleep(random.randint(1, 10))

def generic_workflow(threadName, task_type):
    if task_type == 'task1':
        task1(threadName)
    elif task_type == 'task2':
        task2(threadName)
    elif task_type == 'task3':
        task3(threadName)

class Thread_task(threading.Thread):
    def __init__(self, task_type):
        threading.Thread.__init__(self)
        self.task_type = task_type
    def run(self):
        threadName = threading.currentThread().getName()
        generic_workflow(threadName, task_type)

print('Checking for threads for every task...')

total_threads = 3
for task in THREADS:
    total_threads += THREADS[task]

for task in THREADS:
    print(" ** Starting %d threads for %s **" % (THREADS[task], task))
    for i in range(THREADS[task]):
        task_type = task
        td = Thread_task(task)
        td.start()

## Lock y RLock

Dentro del módulo threading, tenemos las clase Lock y RLock para el bloqueo de hilos y hacer que unos esperen por otros (por ejemplo, para acceder a un recurso compartido).

Lock es la clase más simple. Llamando a Lock.acquire() el hilo bloqueará el Lock, de forma que el siguiente hilo que llame a Lock.acquire() se quedará a la espera de que el Lock se desbloquee. La llamada a Lock.release() desbloquea el Lock, haciendo que el hilo que estaba en espera continúe.


In [3]:
from threading import Thread, Lock
import time


class MiHilo(Thread):
    # Se le pasa como ultimo parametro el Lock
    def __init__(self, inicio, fin, bloqueo):
        Thread.__init__(self)
        self.inicio = inicio
        self.fin = fin
        self.bloqueo = bloqueo

    # Metodo que se ejecutara en un hilo    
    def run(self):
        # Espera por Lock
        bloqueo.acquire()
        # Comienza la cuenta
        for i in range(self.inicio, self.fin):
            print("contador = " + str(i))
            time.sleep(1)
        bloqueo.release()


# Creacion del Lock y bloqueo del mismo
bloqueo = Lock()
bloqueo.acquire()

# Arranque del hilo
hilo = MiHilo(0, 10, bloqueo)
hilo.start()

# Hacemos esperar al hilo
time.sleep(1)
# Un bucle
for i in range(10, 20):
    print("main = " + str(i))
    time.sleep(0.5)

# Liberamos el bloqueo para que el hilo comience
bloqueo.release()

# Hacemos esperar al hilo
time.sleep(1)

print("esperando...")
bloqueo.acquire()

# Un bucle
for i in range(20, 30):
    print("main = " + str(i))
    time.sleep(1)

# Liberamos el bloqueo para que el hilo comience
bloqueo.release()

main = 10
main = 11
main = 12
main = 13
main = 14
main = 15
main = 16
main = 17
main = 18
main = 19
contador = 0
contador = 1esperando...

contador = 2
contador = 3
contador = 4
contador = 5
contador = 6
contador = 7
contador = 8
contador = 9
main = 20
main = 21
main = 22
main = 23
main = 24
main = 25
main = 26
main = 27
main = 28
main = 29


Esta clase Lock es muy simple. Cualquier hilo, haya sido él el que ha bloqueado el Lock o no, puede liberarlo. Si un hilo llama él mismo dos veces a acquire(), se queda bloqueado en la segunda llamada.

Para sincronización más compleja, tenemos también RLock. Este lock sí tiene en cuenta quién es el propietario del bloqueo, de forma que sólo puede release() el hilo que haya hecho el acquire(). También tiene en cuenta el número de llamadas a acquire(). Un mismo hilo puede llamar varias veces a acquire() sin quedarse bloqueado, pero tiene que hacer el mismo número de llamadas a release() para desbloquearlo.

In [4]:
from threading import Thread, RLock
import time


class MiHilo(Thread):
    # Se le pasa como ultimo parametro el Lock
    def __init__(self, inicio, fin, bloqueo):
        Thread.__init__(self)
        self.inicio = inicio
        self.fin = fin
        self.bloqueo = bloqueo

    # Metodo que se ejecutara en un hilo
    def run(self):
        # Espera por Lock
        bloqueo.acquire()
        # Comienza la cuenta
        for i in range(self.inicio, self.fin):
            print("contador = " + str(i))
            time.sleep(1)
        bloqueo.release()


# Creacion del Lock y bloqueo del mismo
bloqueo = RLock()
bloqueo.acquire()

# Arranque del hilo
hilo = MiHilo(0, 10, bloqueo)
hilo.start()

# Hacemos esperar al hilo
time.sleep(1)
# Un bucle y llamamos a acquire() muchas veces
for i in range(0, 10):
    print("bloqueando = " + str(i))
    bloqueo.acquire()
    time.sleep(1)

# Liberamos el bloqueo una sola vez, el hilo no
# comienza
bloqueo.release()
print("El hilo todavia no ha comenzado")

# LLamamos a tantos release() con acquire() antes
for i in range(0, 10):
    print("desbloqueando = " + str(i))
    bloqueo.release()
    time.sleep(1)

bloqueando = 0
bloqueando = 1
bloqueando = 2
bloqueando = 3
bloqueando = 4
bloqueando = 5
bloqueando = 6
bloqueando = 7
bloqueando = 8
bloqueando = 9
El hilo todavia no ha comenzado
desbloqueando = 0
desbloqueando = 1
desbloqueando = 2
desbloqueando = 3
desbloqueando = 4
desbloqueando = 5
desbloqueando = 6
desbloqueando = 7
desbloqueando = 8
desbloqueando = 9
contador = 0
contador = 1


## Condition

Uno de los usos habituales de los hilos es tener un hilo esperando por unos datos para tratarlos. Otro hilo es el encargado de proporcionar esos datos y avisar al primer hilo de que ya están disponibles. Para facilitar este tipo de uso tenemos la clase threading.Condition.

En primer lugar, creamos la Condition. El hilo que debe esperar por los datos, debe llamar al método Condition.acquire() y luego al Condition.wait(). Para llamar a wait() es obligatorio ser el propietario de la Condition, cosa que se consigue llamando a acquire(). La llamada a wait() libera la Condition, pero deja al hilo bloqueado hasta que alguien llame a Condition.notify().

El hilo encargado de suministrar los datos, debe llamar a Condition.acquire() para hacerse dueño de la Condition y cuando los datos estén disponibles, llamar a Condition.notify() y luego a Condition.release(). Estas dos llamadas juntas despertarán al hilo a la espera de datos. La llamada a notify() no libera la Condition, por lo que el hilo que está en el wait() será notifiado, pero no comenzará su ejecución hasta que se llame a release().

El resumen de esto puede ser el siguiente. En algún lado se crea la Condition

In [4]:
from threading import Thread, Condition
import time


# Hilo que espera los datos.
class MiHilo(Thread):
    # Se le pasa una lista de datos y la Condition
    # para espera de los mismos.
    def __init__(self, lista, condicion):
        Thread.__init__(self)
        self.lista = lista
        self.condicion = condicion
        # Para saber cuando terminar el hilo
        self.fin = False

    def run(self):
        self.condicion.acquire()
        # Mientras no haya que terminar
        while not self.fin:
            # Esperar por datos
            self.condicion.wait()
            # Si no hay que terminar se escriben los datos
            # que haya en la lista
            if not self.fin:
                while len(lista) > 0:
                    print(self.lista.pop(0))
        self.condicion.release()


# Creacion de la lista, la Condition y el hilo
lista = []
condicion = Condition()
hilo = MiHilo(lista, condicion)
hilo.start()

# Bucle
for i in range(0, 10):
    # Cogemos la condicion
    condicion.acquire()
    # Ponemos un par de datos en la lista
    lista.append(i)
    lista.append("numero " + str(i))
    time.sleep(0.2)
    # Avisamos que estan listos
    condicion.notify()
    # Y dejamos al hilo continuar
    condicion.release()

    time.sleep(2)

# Mandamos al hilo que termine
hilo.fin = True
condicion.acquire()
condicion.notify()
condicion.release()

0
numero 0
1
numero 1
2
numero 2
3
numero 3
4
numero 4
5
numero 5
6
numero 6
7
numero 7
8
numero 8
9
numero 9


## Semaphore

Uno de los mecanismos más antiguos de sincronización de hilos son los semáforos. Un semáforo permite acceder a un determinado recurso a un número máximo de hilos simultáneamente. Si hay más hilos que el máximo permitido, los pone en espera y los va dejando pasar según van terminando los que están activos. Un semáforo actúa como un contador con un valor inicial. Cada vez que un hilo llama a Semaphore.acquire(), el contador se decrementa en 1 y se deja pasar al hilo. En el momento que el contador se hace cero, NO se deja pasar al siguiente hilo que llame a acquire(), sino que lo deja bloqueado. Cada vez que se llama a Semaphore.release(), el contador se incrementa en 1. Si se hace igual a cero, libera al siguiente hilo en la cola de espera.

Los semáforos sirven para permitir el acceso a un recurso que admite un número máximo de hilos simultáneos. Por ejemplo, si cada hilo abre su conexión a base de datos y sólo queremos un máximo de cinco conexiones abiertas simultáneamente, un semáforo puede ser una opción.

En el código, se debe crear el semáforo indicando el valor inicial del contador (número máximo de hilos que pueden estar activos simultáneamente)

In [5]:
from threading import Thread, Semaphore
import time, random


# Hilo a arrancar
class MiHilo(Thread):
    # Se le pasa un numero identificador del hilo y un semaforo
    def __init__(self, numero_hilo, semaforo):
        Thread.__init__(self)
        self.semaforo = semaforo
        self.numero_hilo = numero_hilo

    def run(self):
        # Espera al semaforo
        semaforo.acquire()
        print("Entra hilo " + str(self.numero_hilo))
        # Pierde un tiempo aleatorio
        time.sleep(5)
        print("Fin hilo " + str(self.numero_hilo))
        # Pone verde el semaforo para el siguiente y
        # termina
        semaforo.release()


random.seed()
# Semaforo que permite pasar a cinco simultaneamente
semaforo = Semaphore(5)
# Se arrancan diez hilos
for i in range(0, 20):
    hilo = MiHilo(i, semaforo)
    hilo.start()
    print("Arrancado hilo " + str(i))

Entra hilo 0Arrancado hilo 0
Entra hilo 1

Arrancado hilo 1
Entra hilo 2
Arrancado hilo 2
Entra hilo 3
Arrancado hilo 3
Entra hilo 4
Arrancado hilo 4
Arrancado hilo 5
Arrancado hilo 6
Arrancado hilo 7
Arrancado hilo 8
Arrancado hilo 9
Arrancado hilo 10
Arrancado hilo 11
Arrancado hilo 12
Arrancado hilo 13
Arrancado hilo 14
Arrancado hilo 15
Arrancado hilo 16
Arrancado hilo 17
Arrancado hilo 18
Arrancado hilo 19
Fin hilo 1Fin hilo 2
Fin hilo 3
Fin hilo 4
Fin hilo 0
Entra hilo 6
Entra hilo 7
Entra hilo 8

Entra hilo 5
Entra hilo 9
Fin hilo 6Fin hilo 8
Entra hilo 10

Entra hilo 11
Fin hilo 9
Entra hilo 12
Fin hilo 7
Entra hilo 13
Fin hilo 5
Entra hilo 14
Fin hilo 10Fin hilo 12
Fin hilo 14
Fin hilo 13
Fin hilo 11
Entra hilo 15
Entra hilo 16
Entra hilo 17
Entra hilo 18

Entra hilo 19
Fin hilo 15Fin hilo 17
Fin hilo 16
Fin hilo 18
Fin hilo 19



## Event

La forma más fácil de hacer que un hilo espere a que otro hilo le avise es por medio de Event. El Event tiene un flag interno que indica si un hilo puede continuar o no. Un hilo llama al método Event.wait() y se queda bloqueado en espera hasta que el flag interno de Event se ponga a True. Otro hilo llame a Event.set() para poner el flag a True o bien a Event.clear() para ponerlo a False.

En el código, símplemente se instancia el Event

In [8]:
from threading import Thread, Event
import time


# Hilo a arrancar
class MiHilo(Thread):
    # Se le pasa un numero identificador del hilo y un event
    def __init__(self, evento):
        Thread.__init__(self)
        self.evento = evento

    # Espera al event
    def run(self):
        self.evento.wait()
        print("Entra hilo ")


# Crea el evento
evento = Event()
# Arranca el hilo
hilo = MiHilo(evento)
print("Creo el hilo")

hilo.start()
# Espera dos segundos y activa el evento
time.sleep(2)
print("Hago evento.set()")
evento.set()

Creo el hilo
Hago evento.set()
Entra hilo 


## Enlaces de interés

MUCHOS DE LOS EJEMPLOS EXTRAÍDOS Y ADAPTADOS DE:

https://github.com/chuidiang/chuidiang-ejemplos-google-code/tree/master/python-hilos/src


Comparativa entre multiprocessing y multithreading para Data Science:

https://analyticsindiamag.com/optimization-in-data-science-using-multiprocessing-and-multithreading/


Documentación oficial de python, esta vez bastante fácil de entender:

https://docs.python.org/es/3.8/library/threading.html


Gran artículo sobre la paralelización para Data Scientists:

https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/







