<p>
<font size='5' face='Georgia, Arial'>IIC-2233 Apunte Programación Avanzada</font><br>
<font size='1'>Equipo Docente IIC2233 2018-1, editado el 2018-2 y 2019-2. Contiene partes de una creación de &copy; Karim Pichara - Christian Pieringer del año 2015 (Todos los derechos reservados).</font>
</p>

## Sincronización

En el *notebook* anterior, vimos qué eran los *threads*, como crearlos, y hablamos sobre situaciones donde podrían ser útiles. Podemos hacer cosas **pseudo-paralelas**, ¿qué podría salir mal?

Hagamos dos *threads* que aumenten un contador $10^6$ veces. Lo que esperaríamos es que el valor final sea $2 \times 10^6$, ¿no es así?

In [1]:
import threading


class Contador: 
    def __init__(self):
        self.valor = 0

        
def sumador(contador):
    for _ in range(10 ** 6):
        contador.valor += 1


contador = Contador()        
t1 = threading.Thread(target=sumador, args=(contador,))
t2 = threading.Thread(target=sumador, args=(contador,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", contador.valor)

Listo, nuestro contador vale 1638765


😱¿Qué pasó ahí? Como dice un viejo refrán:

> Un programador tenía un problema y decidió usar *threads*. Tiene él problemas. Ahora dos.

En este ejemplo, intentamos modificar **al mismo tiempo (concurrentemente)** un mismo valor o recurso con dos *threads* distintos. Para entender por qué eso no siempre resulta como pensábamos, tomemos en cuenta que:

- Las operaciones de los *threads* pueden ser pausadas en cualquier momento para dar paso al otro *thread*.
- Es imposible saber cómo se interlevan (entremezclan) los *threads*. Por ejemplo, es **incorrecto** pensar que el sistema operativo va a hacer una operación del *thread* 1, luego una del _thread_ 2, y así sucesivamente.

Descompongamos en un pseudocódigo – lo más granular posible – lo que hace la máquina en una iteración en cada `sumador`:

    1. Leer el valor de contador.valor
    2. Sumar 1 al valor anterior
    3. Almacenar el valor obtenido en contador.valor 
    
Ahora veamos un escenario posible:

    - T1 lee 0 de contador.valor
    - T1 suma 1 => 1
    - T1 guarda 1 en contador.valor
    - T1 lee 1 de contador.valor
    - T1 se pausa
    - T2 lee 1 de contador.valor
    - T2 suma 1 => 2
    - T2 guarda 2 en contador.valor
    - T2 lee 2 de contador.valor
    - T2 suma 1 => 3
    - T2 guarda 3 en contador.valor
    - T2 se pausa
    - T1 se reanuda
    - T1 suma 1 => 2 (😨)
    - T1 guarda 2 en contador.valor (😨😨😨)
    - ...

In [2]:
import time

def sumador(contador):
    nombre = threading.current_thread().name
    for _ in range(10):
        valor = contador.valor
        print(f"{nombre}: lee {valor}")
        nuevo_valor = valor + 1
        print(f"{nombre}: suma 1 => {nuevo_valor}")
        contador.valor = nuevo_valor
        print(f"{nombre}: guarda {nuevo_valor}")
        time.sleep(1)

contador = Contador()        
t1 = threading.Thread(name="T1", target=sumador, args=(contador,))
t2 = threading.Thread(name="T2", target=sumador, args=(contador,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", contador.valor)

T1: lee 0
T1: suma 1 => 1
T1: guarda 1
T2: lee 1
T2: suma 1 => 2
T2: guarda 2
T1: lee 2
T1: suma 1 => 3
T1: guarda 3
T2: lee 3
T2: suma 1 => 4
T2: guarda 4
T1: lee 4
T1: suma 1 => 5
T1: guarda 5
T2: lee 5
T2: suma 1 => 6
T2: guarda 6
T1: lee 6
T1: suma 1 => 7
T1: guarda 7
T2: lee 7
T2: suma 1 => 8
T2: guarda 8
T1: lee 8T2: lee 8
T2: suma 1 => 9
T2: guarda 9

T1: suma 1 => 9
T1: guarda 9
T2: lee 9
T2: suma 1 => 10
T2: guarda 10
T1: lee 10
T1: suma 1 => 11
T1: guarda 11
T2: lee 11
T2: suma 1 => 12
T2: guarda 12
T1: lee 12
T1: suma 1 => 13
T1: guarda 13
T2: lee 13
T2: suma 1 => 14
T2: guarda 14
T1: lee 14
T1: suma 1 => 15
T1: guarda 15
T2: lee 15
T2: suma 1 => 16
T2: guarda 16
T1: lee 16
T1: suma 1 => 17
T1: guarda 17
T1: lee 17T2: lee 17
T1: suma 1 => 18
T1: guarda 18

T2: suma 1 => 18
T2: guarda 18
Listo, nuestro contador vale 18


La situación anterior nos enseña que **deberíamos asegurarnos** de que la operación de aumentar el contador (`contador.valor += 1`) sea **atómica**, es decir, que un *thread* no la pueda iniciar a menos que ningún otro la esté haciendo. Un conjunto de instrucciones que debe ser **atómico** se denomina **sección crítica**.

La situación que vimos es una de muchas donde más de un *thread* debe compartir el acceso a determinados recursos, como son archivos, variables, etc. En estos escenarios, **solo uno** de los *threads* debe tener acceso al recurso y el resto debe quedar en espera para su uso. Cuando existe **concurrencia** múltiple a un recurso es posible controlar el acceso a este mediante mecanismos de **sincronización** entre los *threads*.

## Mecanismos de sincronización

Ahora, veremos dos formas de coordinar nuestros *threads*.

### *Lock*

El *lock* es una primitiva de sincronización de *threads*, provista por la clase `Lock` de la librería `threading`. Se utiliza para que sólo un *thread* pueda estar en una misma sección crítica a la vez. En otras palabras, el *lock* permite la sincronización para el acceso a los recursos compartidos entre dos o más *threads*. 


Un *lock* puede estar **bloqueado** o **desbloqueado** (inicialmente está desbloqueado). Si un *thread* quiere entrar a una sección crítica, primero debe adquirir el *lock* mediante la operación `acquire()`. Una vez que el *thread* consigue adquirir el *lock*, lo deja bloqueado, haciendo que otros *threads* que quieran adquirir el mismo *lock* deban esperar. Cuando el *thread* quiera salir de la sección crítica, debe liberar el lock mediante `release()`, con lo que el *lock* queda desbloqueado, permitiendo que otro *thread* pueda adquirirlo.

![lock](imgs/lock.png)

Abajo, se ejemplifica el modo de usar un *lock*.

In [3]:
import threading

lock_global = threading.Lock()

def sumador_con_seccion_critica(contador):
    for _ in range(10 ** 6):
        # Pedimos el lock antes de entrar a la sección crítica.
        lock_global.acquire()
        # --- Sección crítica ---. 
        # Está garantizado que en estas líneas sólo habrá un thread a la vez.
        contador.valor += 1
        # --- Fin de la sección crítica ---.
        # Liberamos el lock luego de salir de la sección crítica.
        lock_global.release()

Nuestro ejemplo del contador funcionará correctamente 👍.

In [4]:
contador = Contador()        
t1 = threading.Thread(target=sumador_con_seccion_critica, args=(contador,))
t2 = threading.Thread(target=sumador_con_seccion_critica, args=(contador,))

t1.start()
t2.start()
t1.join()
t2.join()

print("Listo, nuestro contador vale", contador.valor)

Listo, nuestro contador vale 2000000


Afortunadamente en Python los *locks* también pueden funcionar dentro de un *context manager* a través de la sentencia `with`. En este caso es el mismo `with` el que se encarga de llamar los métodos `acquire()` y `release()`. De esta forma el *lock* usado en la funcióm `sumador_con_seccion_critica` quedaría como se indica a continuación:

In [5]:
lock_global = threading.Lock()

def sumador_con_seccion_critica(contador):
    for _ in range(10 ** 6):
        with lock_global:
            # --- Sección crítica ---. 
            # Está garantizado que en estas líneas sólo habrá un thread a la vez.
            contador.valor += 1
            # --- Fin de la sección crítica ---.

### Señales entre *threads*

Otro patrón común que aparece en coordinación entre *threads* es comunicación de sucesos. Esto se refiere a la idea de que un *thread* sea capaz de esperar a cierto suceso ocurra para continuar sus operaciones, por ejemplo, un *thread* que debe esperar a que ciertos computos se hayan completado previamente para poder operar usar esos resultados. El método `join` podría ayudar a este cometido, el *thread* que espera puede usar `join` del(de los) *thead(s)* que espera. Pero `join` solo funciona para esperar el cumplimiento completo de un *thread*. ¿Qué ocurre si se desea esperar un suceso que ocurre durante la vida de otro *thread*? ¿Podemos tener un comportamiento similar pero donde el segundo *thread* no tenga que terminar necesariamente? Es decir, ¿cómo podemos hacer que un *thread* espere a que otro le avise cuando continuar?

Para ello tenemos los objetos `Event`. Un *event* es uno de los mecanismos más simples de comunicación entre *threads*: un *thread* hace una señal, y otros *threads* esperan a que esa señal ocurra. Los `Event` tienen un *flag* interno, que toma el valor `True` cuando la señal está activa, y `False` cuando no.

Un *thread* puede esperar una señal llamando al método `wait()` del `Event`, con ello, el *thread* quedará en pausa hasta que otro *thread* haga la señal correspondiente. En caso de que la señal ya haya estado activa antes de hacer `wait()`, el *thread* puede seguir inmediatamente sin esperar.

Para hacer la señal, un *thread* debe llamar al método `set()`, que dejará el *flag* interno del objeto `Event` en `True`. Finalmente, un *thread* cualquiera puede **resetear** la señal llamando a `clear()` del objeto `Event`, dejando el *flag* en `False`.

![](imgs/event.png)

Un ejemplo es cuando queremos reproducir un audio y un video de la forma más sincronizada posible. Supongamos que tenemos un *thread* encargado de leer el audio, y otro de leer el video. El *thread* encargado del audio debería esperar a que el *thread* encargado del video esté listo para empezar, y viceversa:

In [6]:
# Ejemplo sacado de http://zulko.github.io/blog/2013/09/19/a-basic-example-of-threads-synchronization-in-python/

import threading
import time

# Tenemos dos eventos o señales.
# Esta es para avisar que el video ya está listo para ser reproducido.
video_cargado = threading.Event()
# Esta es para avisar que el audio ya está listo para ser reproducido.
audio_cargado = threading.Event()

def reproducir_video(nombre):
    print(f"Cargando video {nombre} en t={time.time():.6f}")
    # Supongamos que se demora 3 segundos
    time.sleep(3)
    print(f"¡Video cargado! en t={time.time():.6f}")
    # Avisamos que el video ya está cargado
    video_cargado.set()
    # Esperamos a que el audio ya se haya cargado
    audio_cargado.wait()
    # ¡Listo!
    print(f"Reproduciendo video en t={time.time():.6f}")
    
    
def reproducir_audio(nombre):
    print(f"Cargando audio {nombre} en t={time.time():.6f}")
    # Supongamos que se demora 5 segundos
    time.sleep(5)
    print(f"¡Audio cargado! en t={time.time():.6f}")
    # Avisamos que el audio ya está cargado
    audio_cargado.set()
    # Esperamos a que el video ya se haya cargado
    video_cargado.wait()
    # ¡Listo!
    print(f"Reproduciendo audio en t={time.time():.6f}")
    
    
t1 = threading.Thread(target=reproducir_audio, args=("'No Te Enamores' - Paloma Mami",))
t2 = threading.Thread(target=reproducir_video, args=("'No Te Enamores' - Paloma Mami",))

t1.start()
t2.start()

t1.join()
t2.join()

Cargando audio 'No Te Enamores' - Paloma Mami en t=1567573799.890604
Cargando video 'No Te Enamores' - Paloma Mami en t=1567573799.892780
¡Video cargado! en t=1567573802.893165
¡Audio cargado! en t=1567573804.891166
Reproduciendo audio en t=1567573804.891388
Reproduciendo video en t=1567573804.892818


En el ejemplo anterior, gracias a la coordinación de *threads* con *events*, conseguimos que el audio y el video se empiecen a reproducir **casi** simultáneamente. De otra manera, el video habría empezado mucho antes, puesto que demoró solo 3 segundos en cargar, mientras que el audio tardó 5 segundos.

### Otros métodos de coordinación entre *threads*

En Python existen otras maneras de coordinar *threads*, que son adecuadas para otras situaciones. No las veremos en detalle en este curso. Si gustas, puedes verlas en la [documentación](https://docs.python.org/3/library/threading.html#lock-objects), donde encontrarás información sobre objetos `RLock` (*lock* reentrante), objetos condición, objetos semáforo, objetos barrera y más.

## *Deadlocks*

Introdujimos formas de coordinar *threads*, o de hacer que un *thread* espere al otro. Existen situaciones en que – por error – dos o más *threads* se esperan mutuamente, sin que ninguno finalmente avance. A este tipo de situaciones se le llama ***deadlock*** o **interbloqueo**, aunque hay nombres *menos afortunados*...

![Abrazo mortal](imgs/abrazo-mortal.png)

Veamos dos ejemplos concretos de ***deadlocks***.

Ejemplo con *locks*:

In [7]:
import threading
import time


lock_1 = threading.Lock()
lock_2 = threading.Lock()


def master():
    time.sleep(2)
    print("Master: adquiriendo lock_1")
    with lock_1:
        time.sleep(2)
        print("Master: adquiriendo lock_2")
        with lock_2:
            print("Master: trabajando!")


def worker():
    time.sleep(1.5)
    print("Worker: adquiriendo lock_2")
    with lock_2:
        time.sleep(2)
        print("Worker: adquiriendo lock_1")
        with lock_1:
            print("Worker: trabajando!")


t1 = threading.Thread(target=master)
t2 = threading.Thread(target=worker)

t1.start()
t2.start()

En el ejemplo anterior, tenemos dos *threads* y dos *locks*. El *thread master* alcanza a adquirir el `lock_1`, y el *thread worker* alcanza a adquirir el `lock_2`. Luego, *master* trata de adquirir `lock_2`, por lo que debe esperar que *worker* lo libere. Sin embargo, *worker* no liberará el `lock_2` sin antes poder adquirir `lock_1` 💀.

Ejemplo con *events*. Nota que es el mismo que el de la sección de [señales entre _threads_](#Señales-entre-threads), solo que cambiamos el orden en que se revisan/levantan las señales:

In [8]:
import threading
import time


video_cargado = threading.Event()
audio_cargado = threading.Event()

def reproducir_video(nombre):
    print(f"Cargando video {nombre} en t={time.time():.6f}")
    time.sleep(3)
    print(f"¡Video cargado! en t={time.time():.6f}")
    audio_cargado.wait()
    video_cargado.set()
    print(f"Reproduciendo video en t={time.time():.6f}")
    
    
def reproducir_audio(nombre):
    print(f"Cargando audio {nombre} en t={time.time():.6f}")
    time.sleep(5)
    print(f"¡Audio cargado! en t={time.time():.6f}")
    video_cargado.wait()
    audio_cargado.set()
    print(f"Reproduciendo audio en t={time.time():.6f}")
    
    
t1 = threading.Thread(target=reproducir_audio, args=("'No Te Enamores' - Paloma Mami",))
t2 = threading.Thread(target=reproducir_video, args=("'No Te Enamores' - Paloma Mami",))

t1.start()
t2.start()

Cargando audio 'No Te Enamores' - Paloma Mami en t=1567573812.277125
Cargando video 'No Te Enamores' - Paloma Mami en t=1567573812.277899


Esencialmente, en el ejemplo anterior estamos esperando que el otro *thread* avise que hizo su trabajo, antes de avisar que el *thread* actual hizo el suyo. Esto produce que `reproducir_video` espere a que la señal de `audio_cargado` se active, antes de activar `video_cargado`. Sin embargo, `reproducir_audio` está esperando que `video_cargado` se active para luego activar `audio_cargado` 💀. La que puede, puede, pero estos *threads* no pueden.

Lo importante – para ti como programador(a) – es saber que los ***deadlocks*** pueden ocurrir, y que debes tener cuidado al programar para que esto no te pase.