<p>
<font size='5' face='Georgia, Arial'>IIC-2233 Apunte Programaci√≥n Avanzada</font><br>
<font size='1'>Equipo Docente IIC2233 2018-1, editado el 2018-2. Contiene partes de una creaci√≥n de &copy; Karim Pichara - Christian Pieringer del a√±o 2015 (Todos los derechos reservados).</font>
</p>

# Sincronizaci√≥n

## Necesidad de sincronizar _threads_

En el _notebook_ anterior, vimos qu√© eran los _threads_, como crearlos, y hablamos sobre situaciones donde podr√≠an ser √∫tiles. Podemos hacer cosas _pseudo-paralelas_, ¬øqu√© podr√≠a salir mal?

Hagamos dos _threads_ que aumenten un contador $10^6$ veces. Lo que esperar√≠amos es que el valor final sea $2 \times 10^6$, ¬øno es as√≠?

In [1]:
import threading


class Counter: 
    def __init__(self):
        self.value = 0

        
def worker(counter):
    for _ in range(10 ** 6):
        counter.value += 1


counter = Counter()        
t1 = threading.Thread(target=worker, args=(counter,))
t2 = threading.Thread(target=worker, args=(counter,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", counter.value)

Listo, nuestro contador vale 1417177


üò±¬øQu√© pas√≥ ah√≠? Como dice un viejo refr√°n:

> Un programador ten√≠a un problema y decidi√≥ usar _threads_. Tiene √©l problemas. Ahora dos.

En este ejemplo, intentamos modificar **concurrentemente** un mismo valor o recurso con dos _threads_ distintos. Para entender por qu√© eso no siempre resulta como pens√°bamos, tomemos en cuenta que:

- Las operaciones de los _threads_ pueden ser pausadas en cualquier momento para dar paso al otro _thread_.
- Es imposible saber c√≥mo se interlevan los _threads_. Por ejemplo, es **incorrecto** pensar que el sistema operativo va a hacer una operaci√≥n del _thread_ 1, luego una del _thread_ 2, y as√≠ sucesivamente.

Descompongamos en un pseudoc√≥digo ‚Äì lo m√°s granular posible ‚Äì lo que hace la m√°quina en una iteraci√≥n en cada _worker_:

    1. Leer el valor de counter.value
    2. Sumar 1 al valor anterior
    3. Almacenar el valor obtenido en counter.value 
    
Ahora veamos un escenario posible:

    - T1 lee 0 de counter.value
    - T1 suma 1 => 1
    - T1 guarda 1 en counter.value
    - T1 lee 1 de counter.value
    - T1 se pausa
    - T2 lee 1 de counter.value
    - T2 suma 1 => 2
    - T2 guarda 2 en counter.value
    - T2 lee 2 de counter.value
    - T2 suma 1 => 3
    - T2 guarda 3 en counter.value
    - T2 se pausa
    - T1 se reanuda
    - T1 suma 1 => 2 (üò®)
    - T1 guarda 2 en counter.value (üò®üò®üò®)
    - ...

La situaci√≥n anterior nos ense√±a que **deber√≠amos asegurarnos** de que la operaci√≥n de aumentar el contador (`counter.value += 1`) sea **at√≥mica**, es decir, que un _thread_ no la pueda iniciar a menos que ning√∫n otro la est√© haciendo. Un conjunto de instrucciones que debe ser **at√≥mico** se denomina **secci√≥n cr√≠tica**.

La situaci√≥n que vimos es una de muchas donde m√°s de un _thread_ debe compartir el acceso a determinados recursos, como son archivos, variables, etc. En estos escenarios, **solo uno** de los _threads_ debe tener acceso al recurso y el resto debe quedar en espera para su uso. Cuando existe **concurrencia** m√∫ltiple a un recurso es posible controlar el acceso a este mediante mecanismos de **sincronizaci√≥n** entre los _threads_.

## Mecanismos de sincronizaci√≥n

Ahora, veremos dos formas de coordinar nuestros _threads_.

### Lock

El _lock_ es una primitiva de sincronizaci√≥n de _threads_, provista por la clase `Lock` de la librer√≠a `threading`. Se utiliza para que s√≥lo un _thread_ pueda estar en una misma secci√≥n cr√≠tica a la vez. En otras palabras, el _lock_ permite la sincronizaci√≥n para el acceso a los recursos compartidos entre dos o m√°s _threads_. 


Un _lock_ puede estar **bloqueado** o **desbloqueado** (parte desbloqueado). Si un _thread_ quiere entrar a una secci√≥n cr√≠tica, primero debe adquirir el _lock_ mediante la operaci√≥n `acquire()`. Una vez que el _thread_ consigue adquirir el _lock_, lo deja bloqueado, haciendo que otros _threads_ que quieran adquirir el mismo _lock_ deban esperar. Cuando el _thread_ quiera salir de la secci√≥n cr√≠tica, debe liberar el lock mediante `release()`, con lo que el _lock_ queda desbloqueado, permitiendo que otro _thread_ pueda adquirirlo.

![lock](imgs/lock.png)

Abajo, se ejemplifica el modo de usar un _lock_.

In [2]:
import threading

global_lock = threading.Lock()

def worker_con_seccion_critica(counter):
    for _ in range(10 ** 6):
        # Pedimos el lock antes de entrar a la secci√≥n cr√≠tica.
        global_lock.acquire()
        # --- Secci√≥n cr√≠tica ---. 
        # Est√° garantizado que en estas l√≠neas s√≥lo habr√° un thread a la vez.
        counter.value += 1
        # --- Fin de la secci√≥n cr√≠tica ---.
        # Liberamos el lock luego de salir de la secci√≥n cr√≠tica.
        global_lock.release()

Nuestro ejemplo del contador funcionar√° correctamente üëç.

In [3]:
counter = Counter()        
t1 = threading.Thread(target=worker_con_seccion_critica, args=(counter,))
t2 = threading.Thread(target=worker_con_seccion_critica, args=(counter,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", counter.value)

Listo, nuestro contador vale 2000000


Afortunadamente en Python los _locks_ tambi√©n pueden funcionar dentro de un _context manager_ a trav√©s de la sentencia `with`. En este caso es el mismo `with` el que se encarga de llamar los m√©todos `acquire()` y `release()`. De esta forma el _lock_ usado en el m√©todo `worker_con_secci√≥n_cr√≠tica` quedar√≠a como se indica a continuaci√≥n:

In [4]:
global_lock = threading.Lock()

def worker_con_seccion_critica(counter):
    for _ in range(10 ** 6):
        with global_lock:
            # --- Secci√≥n cr√≠tica ---. 
            # Est√° garantizado que en estas l√≠neas s√≥lo habr√° un thread a la vez.
            counter.value += 1
            # --- Fin de la secci√≥n cr√≠tica ---.

### Se√±ales entre _threads_

Vimos el _lock_, que nos permit√≠a que nuestros _threads_ pudieran compartir un mismo recurso. Pero, ¬øc√≥mo podemos hacer que un _thread_ espere a que otro le diga cuando continuar? Para ello tenemos los `Event`. Un _event_ es uno de los mecanismos m√°s simples de comunicaci√≥n entre _threads_: un _thread_ hace una se√±al, y otros _threads_ esperan a que esa se√±al ocurra. Los `Event` tienen un _flag_ interno, que toma el valor `True` cuando la se√±al est√° activa, y `False` cuando no.

Un _thread_ puede esperar una se√±al llamando al m√©todo `wait()` del `Event`, con ello, el _thread_ quedar√° en pausa hasta que otro _thread_ haga la se√±al correspondiente. En caso de que la se√±al ya haya estado activa antes de hacer `wait()`, el _thread_ puede seguir inmediatamente sin esperar.

Para hacer la se√±al, un _thread_ debe llamar al m√©todo `set()`, que dejar√° el _flag_ interno del objeto `Event` en `True`. Finalmente, un _thread_ cualquiera puede _resetear_ la se√±al llamando a `clear()` del objeto `Event`, dejando el _flag_ en `False`.

Un ejemplo es cuando queremos reproducir un audio y un video de la forma m√°s sincronizada posible. Supongamos que tenemos un _thread_ encargado de leer el audio, y otro de leer el video. El _thread_ encargado del audio deber√≠a esperar a que el _thread_ encargado del video est√© listo para empezar, y viceversa:

In [1]:
# Ejemplo sacado de http://zulko.github.io/blog/2013/09/19/a-basic-example-of-threads-synchronization-in-python/

import threading
import time

# Tenemos dos eventos o se√±ales.
# Esta es para avisar que el video ya est√° listo para ser reproducido.
video_cargado = threading.Event()
# Esta es para avisar que el audio ya est√° listo para ser reproducido.
audio_cargado = threading.Event()

def reproducir_video(nombre):
    print(f"Cargando video {nombre} en t={time.time():.6f}")
    # Supongamos que se demora 3 segundos
    time.sleep(3)
    print(f"Video cargado! en t={time.time():.6f}")
    # Avisamos que el video ya est√° cargado
    video_cargado.set()
    # Esperamos a que el audio ya se haya cargado
    audio_cargado.wait()
    # Listo!
    print(f"Reproduciendo video en t={time.time():.6f}")
    
    
def reproducir_audio(nombre):
    print(f"Cargando audio {nombre} en t={time.time():.6f}")
    # Supongamos que se demora 5 segundos
    time.sleep(10)
    print(f"Audio cargado! en t={time.time():.6f}")
    # Avisamos que el audio ya est√° cargado
    audio_cargado.set()
    # Esperamos a que el video ya se haya cargado
    video_cargado.wait()
    # Listo!
    print(f"Reproduciendo audio en t={time.time():.6f}")
    
    
t1 = threading.Thread(target=reproducir_audio, args=("dummy",))
t2 = threading.Thread(target=reproducir_video, args=("dummy",))

t1.start()
t2.start()

t1.join()
t2.join()

Cargando audio dummy en t=1539108900.156606
Cargando video dummy en t=1539108900.165581
Video cargado! en t=1539108903.175079
Audio cargado! en t=1539108910.166115
Reproduciendo audio en t=1539108910.167128
Reproduciendo video en t=1539108910.169122


En el ejemplo anterior, gracias a la coordinaci√≥n de _threads_ con _events_, conseguimos que el audio y el video se empiecen a reproducir _casi_ simult√°neamente. De otra manera, el video habr√≠a empezado mucho antes, puesto que demor√≥ solo 3 segundos en cargar, mientras que el audio tard√≥ 5 segundos.

### Otros m√©todos de coordinaci√≥n entre _threads_

En Python existen otras maneras de coordinar _threads_, que son adecuadas para otras situaciones. No las veremos en detalle en este curso. Si gustas, puedes verlas en la [documentaci√≥n](https://docs.python.org/3/library/threading.html#lock-objects)

## _Deadlocks_

Introdujimos formar de coordinar _threads_, o de hacer que un _thread_ espere al otro. Existen situaciones ‚Äì por error ‚Äì donde dos o m√°s _threads_ se esperan mutuamente, sin que ninguno finalmente avance. A este tipo de situaciones se le llama _**deadlock**_ o **interbloqueo**, aunque hay nombres _menos afortunados_...

![Abrazo mortal](imgs/abrazo-mortal.png)

Veamos dos ejemplos concretos de _**deadlocks**_.

Ejemplo con _locks_:

In [6]:
import threading
import time


lock_1 = threading.Lock()
lock_2 = threading.Lock()


def master():
    time.sleep(2)
    print("Master: adquiriendo lock_1")
    with lock_1:
        time.sleep(2)
        print("Master: adquiriendo lock_2")
        with lock_2:
            print("Master: trabajando!")


def worker():
    time.sleep(1.5)
    print("Worker: adquiriendo lock_2")
    with lock_2:
        time.sleep(2)
        print("Worker: adquiriendo lock_1")
        with lock_1:
            print("Worker: trabajando!")


t1 = threading.Thread(target=master)
t2 = threading.Thread(target=worker)

t1.start()
t2.start()

Worker: adquiriendo lock_2
Master: adquiriendo lock_1
Worker: adquiriendo lock_1
Master: adquiriendo lock_2


En el ejemplo anterior, tenemos dos _threads_ y dos _locks_. El _thread_ _master_ alcanza a adquirir el `lock_1`, y el _thread_ _worker_ alcanza a adquirir el `lock_2`. Luego, _master_ trata de adquirir `lock_2`, por lo que debe esperar que _worker_ lo libere. Sin embargo, _worker_ no liberar√° el `lock_2` sin antes poder adquirir `lock_1` üíÄ.

Ejemplo con _events_. Nota que es el mismo que el de la secci√≥n de [se√±ales entre _threads_](#Se√±ales-entre-threads), solo que cambiamos el orden en que se revisan/levantan las se√±ales:

In [7]:
import threading
import time


video_cargado = threading.Event()
audio_cargado = threading.Event()

def reproducir_video(nombre):
    print(f"Cargando video {nombre} en t={time.time():.6f}")
    time.sleep(3)
    print(f"Video cargado! en t={time.time():.6f}")
    audio_cargado.wait()
    video_cargado.set()
    print(f"Reproduciendo video en t={time.time():.6f}")
    
    
def reproducir_audio(nombre):
    print(f"Cargando audio {nombre} en t={time.time():.6f}")
    time.sleep(5)
    print(f"Audio cargado! en t={time.time():.6f}")
    video_cargado.wait()
    audio_cargado.set()
    print(f"Reproduciendo audio en t={time.time():.6f}")
    
    
t1 = threading.Thread(target=reproducir_audio, args=("dummy",))
t2 = threading.Thread(target=reproducir_video, args=("dummy",))

t1.start()
t2.start()

Cargando audio dummy en t=1534132765.326378
Cargando video dummy en t=1534132765.326894
Video cargado! en t=1534132768.332214
Audio cargado! en t=1534132770.329270


Esencialmente, en el ejemplo anterior estamos esperando que el otro _thread_ avise que hizo su trabajo, antes de avisar que el _thread_ actual hizo el suyo. Esto produce que `reproducir_video` espere a que la se√±al de `audio_cargado` se active, antes de activar `video_cargado`. Sin embargo, `reproducir_audio` est√° esperando que `video_cargado` se active para luego activar `audio_cargado` üíÄ.

Lo importante ‚Äì para ti como programador(a) ‚Äì es saber que los _**deadlocks**_ pueden ocurrir, y que debes tener cuidado al programar para que esto no te pase.

## M√°s ejemplos y aplicaciones

### _Lock_ como variable de una subclase de `Thread`

Como recordar√°s, una manera de crear nuestros _threads_ consiste en subclasear `Thread` y sobreescribir el m√©todo `run` e `__init__`. Podemos aprovechar de colocar los _locks_ que necesitemos como variable de clase. De esta manera, tendremos un mismo _lock_ para todos los _threads_ de nuestra clase y organizaremos mejor nuestro c√≥digo.

En el siguiente ejemplo, escribiremos en un mismo archivo con varios _threads_.

In [7]:
import threading
import time
from random import random


class EscritorArchivo(threading.Thread):
    """
    Esta clase modela un thread. Dentro creamos un objeto para bloqueo dentro de la clase. 
    El Lock es una variable independiente de cada thread
    """
    
    lock = threading.Lock()
    
    def __init__(self, numero, archivo):
        super().__init__()
        self.name = f"EscritorArchivo n√∫mero {numero}"
        self.numero = numero
        self.archivo = archivo
    
    def run(self):
        # Recordemos que la forma de buscar atributos es:
        # 1. Buscar en la instancia (en este caso, no existe).
        # 2. Si no existe, buscar en la clase (en este caso, lo encuentra).
        # 3. Si no existe, error.
        with self.lock:
            try:
                self.archivo.write(f"L√≠nea escrita por thread # {self.numero}\n")
            finally:
                # Hacemos que se demore una cantidad random uniforme [0, 1)
                time.sleep(8*random())
                

# Creamos un archivo para escribir una salida. 
# Luego creamos los threads que escribir√°n dentro del archivo
with open("files/salida.txt", "w", encoding="UTF-8") as archivo:
    # Creamos los threads
    cantidad_threads = 7
    threads = [EscritorArchivo(i, archivo) for i in range(cantidad_threads)]
    
    # Hacemos partir los threads
    for thread in threads:
        thread.start()

    # Esperamos a todos los threads antes de cerrar el archivo
    for thread in threads:
        thread.join()

### Patr√≥n productor-consumidor

Un problema com√∫n en programaci√≥n concurrente es el patr√≥n **productor-consumidor**. Este se origina cuando dos o m√°s _threads_, conocidos como **productores** y **consumidores**, acceden a un mismo espacio de almacenamiento o _**buffer**_.

Bajo este esquema, los productores ponen √≠tems en el _buffer_ y los consumidores sacan elementos del _buffer_. Este modelo permite la comunicaci√≥n entre distintos _threads_. Por lo general el _buffer_ compartido en este modelo se implementa mediante una **cola sincronizada** o una **cola _thread-safe_**. 

Si bien los `deque` permiten agregar y sacar elementos desde ambos extremos en forma segura con _threads_, **nada nos asegura** que si vimos que hab√≠a un objeto para sacar, ese objeto todav√≠a est√© cuando queramos sacarlo. Por lo tanto, tenemos que asegurarnos nosotros mismos ‚Äì _v√≠a locks_ ‚Äì que ver si hab√≠a algo y sacarlo sea una operaci√≥n at√≥mica.

Por ejemplo, supongamos que el productor es un panadero, y los consumidores son clientes de un supermercado. El panadero colocar√° piezas de pan cuando las tenga listas, y los consumidores sacar√°n estas piezas cuando est√©n disponibles. Implementemos esto:

In [12]:
from collections import deque
from random import choices
import threading
import time

piezas_de_pan = deque()

def panadero():
    # El panadero har√° 2 veces pan
    for partida in range(2):
        # En cada vez, producir√° 5 piezas de pan.
        # Se demorar√° 5 segundos por vez (que r√°pido :D)
        time.sleep(5)
        print("Produje 5 piezas de pan en la partida", partida)
        piezas_de_pan.extend(choices(["Marraqueta", "Baguette", "Hallulla"], k=5))


lock_sacar_pan = threading.Lock()

def cliente():
    while True:
        # El cliente verifica si hay pan antes de sacarlo.
        # Necesitamos asegurarnos que si vio que hab√≠a pan, nadie se lo quite.
        # Para eso, ponemos un lock para que la operaci√≥n 
        # de ver si hab√≠a pan y luego sacarlo sea at√≥mica.
        lol = threading.current_thread()
        with lock_sacar_pan:
            if piezas_de_pan:
                mi_pan = piezas_de_pan.popleft()
                print(f"Cliente {lol.name}: Saqu√© mi {mi_pan}!")
                break


thread_panadero = threading.Thread(target=panadero)
threads_clientes = [threading.Thread(target=cliente, name=i) for i in range(10)]

thread_panadero.start()
for thread_cliente in threads_clientes:
    thread_cliente.start()

Produje 5 piezas de pan en la partida 0
Cliente Thread-85: Saqu√© mi Hallulla!
Cliente 9: Saqu√© mi Hallulla!
Cliente 3: Saqu√© mi Marraqueta!
Cliente 7: Saqu√© mi Hallulla!
Cliente 4: Saqu√© mi Baguette!
Produje 5 piezas de pan en la partida 1
Cliente 8: Saqu√© mi Hallulla!
Cliente 5: Saqu√© mi Hallulla!
Cliente 2: Saqu√© mi Marraqueta!
Cliente 6: Saqu√© mi Hallulla!
Cliente 1: Saqu√© mi Hallulla!


La implementaci√≥n anterior tiene un problema: los clientes gastan CPU en forma innecesaria verificando si hay pan o no, ya que si no hay pan vuelven a revisar de inmediato. Podr√≠amos evitar este gasto si el panadero enviara una se√±al a los clientes cuando √©l tenga pan listo. No obstante, los clientes tendr√°n que verificar de igual manera que todav√≠a quede pan, porque podr√≠a haber m√°s clientes que piezas de pan disponibles en ese momento.

Afortunadamente, en Python existe una librer√≠a optimizada para manejar este tipo de casos.

#### `Queue`

La librer√≠a `queue` tiene implementada una cola hecha para situaciones donde hay varios _threads_. Tiene m√©todos que la hacen un poco diferente a la implementada en `collections`:

- `put()`: Agrega un √≠tem al final de la cola (_push_)
- `get()`: Remueve y retorna un √≠tem de la cola (_pop_). Lo interesante es que este m√©todo **espera** hasta que exista algo para sacar de la cola.
- `task_done()`: Requiere ser llamado cada vez que un √≠tem extra√≠do de la cola ha sido procesado.
- `join()`: El _thread_ que llame a este m√©todo queda en pausa hasta que todos los √≠tems de la cola hayan sido procesados.

Volvamos al ejemplo anterior, ahora usando `Queue`.

In [10]:
from queue import Queue
from random import choice
import threading
import time

piezas_de_pan = Queue()

def panadero():
    # El panadero har√° 2 veces pan
    for partida in range(2):
        # En cada vez, producir√° 5 piezas de pan.
        # Se demorar√° 5 segundos por vez (que r√°pido :D)
        time.sleep(5)
        print("Produje 5 piezas de pan en la partida", partida)
        for _ in range(5):
            piezas_de_pan.put(choice(["Marraqueta", "Baguette", "Hallulla"]))


def cliente():
    mi_pan = piezas_de_pan.get()
    print(f"Saqu√© mi {mi_pan}!")
    piezas_de_pan.task_done()


thread_panadero = threading.Thread(target=panadero)
threads_clientes = [threading.Thread(target=cliente) for _ in range(10)]

thread_panadero.start()
for thread_cliente in threads_clientes:
    thread_cliente.start()

Produje 5 piezas de pan en la partida 0
Saqu√© mi Baguette!
Saqu√© mi Hallulla!
Saqu√© mi Hallulla!
Saqu√© mi Marraqueta!
Saqu√© mi Baguette!
Produje 5 piezas de pan en la partida 1
Saqu√© mi Baguette!Saqu√© mi Hallulla!
Saqu√© mi Hallulla!
Saqu√© mi Marraqueta!

Saqu√© mi Marraqueta!


Con `Queue` tambi√©n podr√≠amos comunicar _threads_ para que hagan ciertas tareas. En ese caso, en vez de pasar objetos cualquiera podr√≠amos pasar mensajes con cierto formato que todos puedan entender.

### Los _prints_ est√°n extra√±os

Si eres una persona que se fija en los detalles, te habr√°s dado cuenta que en este cap√≠tulo ‚Äì a veces ‚Äì faltan o sobran saltos de l√≠nea en los _prints_. La pregunta es, ¬øtienen que ver los _threads_ en esto? La respuesta es s√≠.

De acuerdo a [este post en StackOverflow](https://stackoverflow.com/questions/31142566/print-skipping-newline) esto se produce porque ‚Äì a nivel de m√°quina ‚Äì escribir el texto del `print` y escribir el salto de l√≠nea son dos instrucciones distintas. Por lo tanto, es posible que un _thread_ imprima el texto, se pause ese _thread_, luego otro _thread_ imprima su texto y su respectivo salto de l√≠nea, y luego al volver al primer _thread_ este imprima el salto de l√≠nea que faltaba.