<p>
<font size='5' face='Georgia, Arial'>IIC-2233 Apunte Programación Avanzada</font><br>
<font size='1'>&copy; 2015 Karim Pichara - Christian Pieringer. Todos los derechos reservados.</font>
</p>

## Sincronización

En el _notebook_ anterior, vimos qué eran los _threads_, como crearlos, y hablamos sobre situaciones donde podrían ser útiles. Podemos hacer cosas _pseudo-paralelas_, ¿qué podría salir mal?

Hagamos dos _threads_ que aumenten un contador $10^6$ veces. Lo que esperaríamos es que el valor final sea $2 \times 10^6$, ¿no es así?

In [15]:
import threading


class Counter: 
    def __init__(self):
        self.value = 0

        
def worker(counter):
    for _ in range(10 ** 6):
        counter.value += 1


counter = Counter()        
t1 = threading.Thread(target=worker, args=(counter,))
t2 = threading.Thread(target=worker, args=(counter,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", counter.value)

Listo, nuestro contador vale 1255299


😱¿Qué pasó ahí? Como dice un viejo refrán:

> Un programador tenía un problema y decidió usar _threads_. Tiene él problemas. Ahora dos.

En este ejemplo, intentamos modificar **concurrentemente** un mismo valor o recurso con dos _threads_ distintos. Para entender por qué eso no siempre resulta como pensábamos, tomemos en cuenta que:

- Las operaciones de los _threads_ pueden ser pausadas en cualquier momento para dar paso al otro _thread_.
- Es imposible saber cómo se interlevan los _threads_. Por ejemplo, es **incorrecto** pensar que el sistema operativo va a hacer 1 operación del _thread_ 1, luego 1 del _thread_ 2, y así sucesivamente.

Descompongamos en un pseudocódigo – lo más granular posible – lo que hace una iteración en cada _worker_:

    1. Leer el valor de counter.value
    2. Sumar 1 al valor anterior
    3. Almacenar el valor obtenido en counter.value 
    
Ahora veamos un escenario posible:

    - T1 lee 0 de counter.value
    - T1 suma 1 => 1
    - T1 guarda 1 en counter.value
    - T1 lee 1 de counter.value
    - T1 se pausa
    - T2 lee 1 de counter.value
    - T2 suma 1 => 2
    - T2 guarda 2 en counter.value
    - T2 lee 2 de counter.value
    - T2 suma 1 => 3
    - T2 guarda 3 en counter.value
    - T2 se pausa
    - T1 se reanuda
    - T1 suma 1 => 2 (😨)
    - T1 guarda 2 en counter.value (😨😨😨)
    - ...

La situación anterior nos enseña que **deberíamos asegurarnos** de que la operación de aumentar el contador (`counter.value += 1`) sea **atómica**. Es decir, que un _thread_ no la pueda iniciar a menos que ningún otro la esté haciendo. Eso nos exige **coordinar o sincronizar** los _threads_.

Esta es una de las muchas situaciones donde más de un _thread_ debe compartir el acceso a determinados recursos, como son archivos, variables, etc. Durante este proceso, **solo uno** de los _threads_ debe tener acceso al recurso y el resto debe quedar en espera para su uso. Cuando existe **concurrencia** múltiple a un recurso es posible controlar el acceso a este mediante mecanismos de **sincronización** entre los _threads_.

<h2>Bloqueo</h2>

El <b>bloqueo</b> permite la sincronización para el acceso a los recursos compartidos entre dos o más threads. La librería <i>Threading</i> nos provee de la clase <b>Lock</b> que permite esta sincronización. Un bloqueo tiene dos estados: <b>bloqueado</b> y <b>desbloqueado</b>. El estado por defecto es desbloqueado. Para bloquear la ejecución de otros threads durante su ejecución se utiliza el método <b>acquire()</b> mediante el cual el thread actual que llega hasta ese punto del programa toma control del recurso. Durante este tiempo el resto de los threads detiene la ejecución cuando trata de tomar control del recurso hasta que se libera el acceso mediante el método <b>release()</b> llamado desde el thread que mantiene el bloqueo. En la siguiente figura vemos un esquema general de sincronización entre threads usan bloqueos.

![lock](imgs/lock.png)

In [1]:
import threading
import time
from random import random


class MiThread(threading.Thread):
    # Esta clase modela un thread.
    
    def __init__(self, i, archivo, lock_escritura=None):
        super().__init__()
        self.i = i
        self.archivo = archivo
        self.lock_escritura = lock_escritura
    
    def run(self):
        # El método run() maneja que debe hacer el thread durante la ejecución 
        # cada vez que se llama al método start()
        
        # bloquea la ejecución de los demas threads al intentar escribir en el archivo
        self.lock_escritura.acquire() 
        try:
            self.archivo.write('Esta linea fue escrita por el thread # {}\n'.format(self.i))
        finally:
            # devuelve el control del recurso a los threads en espera
            time.sleep(random())
            self.lock_escritura.release()
            
                
if __name__ == '__main__':
    num_threads = 7
    threads = []
    
    # Creamos un archivo para escribir una salida. Luego creamos los threads 
    # que escribirán dentro del archivo
    lock_escritura = threading.Lock()

    with open('salida.txt', 'w') as archivo:
        for i in range(num_threads):
            # se crea el thread pasando sus parámetros, pasando el lock como referencia
            my_thread = MiThread(i, archivo, lock_escritura) 
            
            # Se inicializa el thread. Se ejecuta lo que tiene el método run()
            my_thread.start()
            
            threads.append(my_thread)

        # Evita que el archivo sea cerrado antes que los threads terminen de escribir
        for thread in threads:
            thread.join()

Otra variante del mismo ejemplo. Podemos también crear el Lock como variable de clase, de esta forma el lock sigue siendo independiente al thread que lo usará.

In [2]:
import threading
import time
from random import random


class MiThread(threading.Thread):
    # Esta clase modela un thread. Dentro creamos un objeto para bloqueo dentro de la clase
    # El Lock es una variable independiente de cada thread
    lock = threading.Lock()
    
    def __init__(self, i, archivo):
        super().__init__()
        self.i = i
        self.archivo = archivo
    
    def run(self):
        # El método run() maneja que debe hacer el thread durante la ejecución 
        # cada vez que se llama al método start()
        
        # bloquea la ejecución de los demas threads al intentar escribir en el archivo
        MiThread.lock.acquire() 
        try:
            self.archivo.write('Esta linea fue escrita por el thread # {}\n'.format(self.i))
        finally:
            # devuelve el control del recurso a los threads en espera
            time.sleep(random())
            MiThread.lock.release()
            
                
if __name__ == '__main__':
    num_threads = 7
    threads = []
    
    # Creamos un archivo para escribir una salida. Luego creamos los threads 
    # que escribirán dentro del archivo
    
    with open('salida.txt', 'w') as archivo:
        for i in range(num_threads):
            my_thread = MiThread(i, archivo) # se crea el thread pasando sus parámetros
            my_thread.start() # Se inicializa el thread. Se ejecuta lo que tiene el método run()
            threads.append(my_thread)
        
        # Evita que el archivo sea cerrado antes que los threads terminen de escribir
        for thread in threads:
            thread.join()

Afortunadamente en Python los bloqueos pueden también funcionar dentro de un <i>context manager</i> a través de la sentencia <b>with</b>. En este caso es el mismo <i>with</i> el que se encarga de llamar los métodos <i>acquire()</i> y <i>release()</i>. De esta forma el bloqueo usado en el método <i>run</i> del ejemplo anterior quedaría implementado como se muestra a continuación.

In [3]:
def run(self):
    with MiThread.lock:
        self.archivo.write('Esta linea fue escrita por el thread # {}\n'.format(self.i))

Un problema común en programación concurrente es el patrón <b>Productor-Consumidor</b>. Este se origina cuando dos o más threads, conocidos como <b>productores</b> y <b>consumidores</b>, acceden a un mismo espacio almacenamiento o <b>buffer</b>. Bajo este esquema, los productores ponen ítems en el <i>buffer</i> y los consumidores sacan elementos del <i>buffer</i>. Este modelo permite la comunicación entre distintos threads. Por lo general el <i>buffer</i> compartido en este modelo se implementa mediante una <b>cola sincronizada</b> o <b>cola segura</b>. 

Por ejemplo, supongamos que podemos separar un programa que realiza el procesamiento de un archivo de texto en dos procesos independientes implementados mediante threads. Donde, el primer thread se encargará de la lectura del archivo y procesamiento de las líneas; y el segundo thread de almacenar en otro archivo el resultado de la suma de ambos valores leídos. Comunicaremos ambos threads mediante una cola sincronizada implementada como se muestra a continuación.

In [4]:
import collections

class MiDeque(collections.deque):
    # Para crear la cola heredamos un deque desde el modulo collections 
    # y agregaremos los mecanismos de bloqueo para asegurar la sincronización 
    # entre los threads.

    def __init__(self):
        super().__init__()
        self.lock = threading.Lock() # agregamos el seguro a la cola

    def agregar(self, elemento):
        # Como mencionamos anteriormente, los bloqueos pueden ser usados
        # dentro de un context-manager

        with self.lock:
            self.append(elemento)
            print('[AGREAGAR] cola tiene {} elementos'.format(len(self)))

    def obtener(self):
        with self.lock:
            print('[SACAR] la cola tiene {} elementos'.format(len(self)))
            return self.popleft()

Veamos ahora el resto de la implementación del productor y el consumidor. Como recomendación, probar los ejemplos directamente en un terminal o desde un IDE como PyCharm.

In [None]:
import threading
import time


class Productor(threading.Thread):
    
    def __init__(self, cola):
        super().__init__()
        self.cola = cola

    def run(self):
        # Abrimos un contexto para manejar el archivo de entrada y procesamos cada línea

        with open('lista_numeros.txt') as archivo:           
            for linea in archivo:
                valores = tuple(map(int, linea.strip().split(',')))
                self.cola.agregar(valores)


class Consumidor(threading.Thread):
    
    def __init__(self, cola):
        super().__init__()
        self.cola = cola

    def run(self):
        with open('numeros_procesados.txt', 'w') as archivo:
            while len(self.cola) > 0:
                numeros = self.cola.obtener()
                archivo.write('{}\n'.format(sum(numeros)))
                # ayuda a simular que el consumidor es más lento que el productor
                time.sleep(0.001) 
            


if __name__ == '__main__':

    cola = MiDeque()

    p = Productor(cola)
    p.start()
    
    c = Consumidor(cola)
    c.start()

<h2>Queue</h2>

Afortunadamente en Python existe una libería optimizada para el manejo de colas seguras en modelos <b>productor-consumidor</b>. La librería <b>queue</b> tiene implementada una cola que maneja múltiples concurrencias de forma segura. Es distinta a la cola implementada en <b>collections</b> usada para estructura de datos, la que no tiene ningún tipo de bloqueo para sincronización.

Los métodos principales de una cola de la librería Queue son:

- <b>put()</b>: agrega un ítem a la cola (push)
- <b>get()</b>: remueve y retorna un ítem desde la cola (pop)
- <b>task_done()</b>: require ser llamado cada vez que in ítem ha sido procesado
- <b>join()</b>: bloquea la cola hasta que todos los ítems han sido procesados

Volvamos al ejemplo anterior del procesamiento de un archivo de texto mediante dos threads independientes. El modelamiento quedaría de la siguiente manera.

In [5]:
import threading
import time
import queue
from random import randint


class Productor(threading.Thread):
    def __init__(self, cola):
        super().__init__()
        self.cola = cola

    def run(self):
        with open('lista_numeros.txt') as archivo:
            for linea in archivo:
                valores = tuple(map(int, linea.strip().split(',')))
                self.cola.put(valores)
                print('[Productor] la cola tiene {} elementos'.format(self.cola.qsize()))
                
                # ayuda a simular que los procesos son más pesados computacionalmente
                time.sleep(0.1) 
                
            # Detendra el consumidor una vez que termine de procesar el ultimo valor
            self.cola.put('STOP')
                
class Consumidor(threading.Thread):
    
    def __init__(self, cola):
        super().__init__()
        self.cola = cola
        
    def run(self):
        with open('numeros_procesados.txt', 'w') as archivo:
            
            while True:
                # Se utiliza try/except para revisar que haya elementos en la cola
                # Debemos chequear la condicion de termino de consumidor ('STOP'). 
                # De caso contrario el consumidor estaría ejecutándose infinitamente.

                try:
                    # si no hay más elementos en la cola levanta una 
                    # excepcion del tipo Empty desde queue.
                    numeros = self.cola.get(False) 

                except queue.Empty:
                    continue

                else:
                    if numeros == 'STOP':
                        print('[Consumidor] proceso finalizado')
                        break
                    
                    archivo.write('{}\n'.format(sum(numeros)))
                    self.cola.task_done()

                    # qsize() retorna el tamaño de la cola
                    print('[Consumidor] la cola ahora tiene {} elementos'.format(self.cola.qsize())) 

                    # Simula un proceso más pesado.                    
                    time.sleep(randint(1, 5)) 



if __name__ == '__main__':

    q = queue.Queue() # se crea una cola sincronizada desde la librería queue

    p = Productor(q) # se crea el productor que recibe como argumento una cola Q
    p.start()

    # se crea un thread con el consumidor. También recibe la cola.
    # Para implementarlo mediante un enfoque funcional:
    # threading.Thread(target=consumidor, args=(q,)) 
    c = Consumidor(q)
    c.start()

[Productor] la cola tiene 1 elementos
[Consumidor] la cola ahora tiene 0 elementos
[Productor] la cola tiene 1 elementos
[Productor] la cola tiene 2 elementos
[Productor] la cola tiene 3 elementos
[Consumidor] la cola ahora tiene 3 elementos
[Consumidor] la cola ahora tiene 2 elementos
[Consumidor] la cola ahora tiene 1 elementos
[Consumidor] proceso finalizado
