![logo](../files/misc/logo.png)
<h1 style="color:#872325">Concurrency</h1>

### Trabajando en paralelo

_**concurrencia**_: Acción de concurrir distintas personas, sucesos o cosas en un mismo lugar o tiempo.

En resumidas cuentas, los transistores son _amplificadores de señal_ que pueden ser usados como apagadores (ON/OFF) en un circuito. Al conectar transistores creamos circuitos lógicos.

* A medida que contamos con mayor cantidad de transistores, los circuitos se pueden hacer cada vez más complejos
* A medida que el tamaño de los transistores disminuye, más rápidos son en procesar la información.

A medida que ha pasado el tiempo, el número de transistores dentro de las computadoras han ido aumentando. De acuerdo a la [ley de Moore](https://en.wikipedia.org/wiki/Moore%27s_law), el número de transistores en un microchip se duplica cada dos años

![timeline_number_transistors](../files/lec08/transistors_timeline.png)

## Subprocesos

Un hilo (_thread_ en inglés) o subproceso, es una forma de dividir (partir) un programa en dos o mas tareas simultáneamente o pseudo-simultáneamente.

### Un Proceso Simultáneo
![Proceso Simultáneo](../files/lec08/simultaneous-sprocess.png)

En un proceso simultáneo, un mismo programa coordina dos núcleos de un mismo cpu. Estos pueden compartir los recursos disponibles y que cada uno cree.

### Un Proceso Pseudo-Simultáneo
![Proceso Simultáneo](../files/lec08/pseudo-simultaneous.png)

En un proceso pseudo-simultáneo, un mimso programa coordina dos subprocesos de un mismo programa. Estos no necesariamente viven en núcleos diferentes. 

----
## El GIL

En procesadores multinúcleo, una colección de subprocesos dentro de un proceso (un único programa de computación) se puede correr en paralelo. Esto permite hacer uso de la memoria RAM simúltáneamente entre dos subprocesos.

Esta última idea sería equivalente a correr un único programa de Python que pueda hacer uso de múltiples núcleos de nuestro CPU.

La implementación de Python se escribió de tal manera que **prohibe** la ejecución de un proceso púramente simultáneo. A esta implementación se le conoce como el _GIL_

>  **El GIL hace de un programa de python correr un único subproceso.**

Podemos pensar el GIL cómo una bandera: un único subproceso puede tener el GIL a cada punto de tiempo. Cualquier otro subproceso no puede puede avanzar en ejecución mientras quien tenga la bandera no haya terminado.

En efecto, el GIL hace de cualquier programa (multinúcleo o no), dependiente de un único núcleo a cada parte del tiempo

# Concurrencia en Python
Aunque el GIL no nos permite correr procesos completamente simúltaneos, Python cuenta con librerías que nos permiten trabajar de una manera simultánea y pseudo-simultánea.

Saber la libería a ocupar depende del proceso del cuál dependa el problema a resolver. En general, podemos hablar de dos clases de procesos concurrentes: _I/O Bound_ y _CPU Bound_.

**I/O Bound**  
Se dice que un proceso es de tipo I/O (_I/O bound_) si el tiempo para completar una tarea está determinado principalmentete por el período de espera de operaciones de ingreso y egreso.

**CPU Bound**  
Se dice que unn proceso es tipo CPU (_CPU Bound_) si el tiempo para completarr una tarea está determinado principalente por operaciones que se lleven a cabo dentro de CPU.

# I/O Bound

Al tener un proceso de tipo I/O, el poder computacional de nuestra máquina afecta marginalmente al tiempo en el cuál el programa se terminará de correr. Este tipo de procesos se presentan principalmente al momento de tener que esperar la respuesta de una maquina ajena a la nuestra para poder continuar con el proceso.

<h2 style="color:teal">Ejemplo</h2>

Supongamos queremos extrer todos los _tags_ `h1`, `h2` y `h3` de una lísta de páginas. Para lograr esto, introduciremos una nueva librería de la librería estándard

¿De qué manera podríamos lograr esto?

```HTML
<h1>Esto es un h1</h1>
<h2>Esto es un h2</h2>
<h3>Esto es un h3</h3>
<h4>Esto es un h4</h4>
```

<h1>Esto es un h1</h1>
<h2>Esto es un h2</h2>
<h3>Esto es un h3</h3>
<h4>Esto es un h4</h4>

In [1]:
from urllib.request import urlopen
from string import punctuation
from time import time
import re

def extract_headers(url):
    regexp = re.compile(r"<h[1-4](?:.*)>([ A-Za-z0-9\?]+)<.*>")
    init = time()
    with urlopen(url) as res:
        html = res.read().decode("utf8")
    print(f"{url} took {time() - init:0.2f} seconds")
    headers = regexp.findall(html)
    return headers

def batch_extract_headers(list_urls):
    headers = []
    for url in list_urls:
        headers.extend(extract_headers(url))
    return headers

In [53]:
urls = [
    "https://projecteuler.net/",
    "https://nabla.mx",
    "https://ocw.mit.edu/index.htm",
    "https://distill.pub/",
    "https://github.com",
    "https://www.python.org/",
    "https://nips.cc/",
    "https://scikit-learn.org/stable/",
    "https://arxiv.org/",
    "https://www.nature.com/",
    "https://www.springer.com/gp/book/9780387310732",
    "https://docs.scipy.org/doc/numpy/index.html",
    "https://online.stanford.edu/",
    "https://keras.io/",
    "https://math.stackexchange.com/",
    "https://lexfridman.com/"
]

In [3]:
%%time
headers = batch_extract_headers(urls)

https://projecteuler.net/ took 0.79 seconds
https://nabla.mx took 18.95 seconds
https://ocw.mit.edu/index.htm took 0.72 seconds
https://distill.pub/ took 0.58 seconds
https://github.com took 0.67 seconds
https://www.python.org/ took 0.45 seconds
https://nips.cc/ took 0.94 seconds
https://scikit-learn.org/stable/ took 0.59 seconds
https://arxiv.org/ took 2.46 seconds
https://www.nature.com/ took 2.69 seconds
https://www.springer.com/gp/book/9780387310732 took 1.68 seconds
https://docs.scipy.org/doc/numpy/index.html took 1.11 seconds
https://online.stanford.edu/ took 0.72 seconds
https://keras.io/ took 2.52 seconds
https://math.stackexchange.com/ took 17.87 seconds
https://lexfridman.com/ took 0.65 seconds
CPU times: user 375 ms, sys: 59.4 ms, total: 435 ms
Wall time: 53.4 s


In [4]:
len(headers)

168

# Threads

La librería `threads` de la librería estándard nos permite correr subprocesos pseudo-simultáneos dentro de python. El objeto principal para ejecutar subprocesos asincronicamente es `ThreadPoolExecutor`, el cuál

> [...] es una subclase de `Executor` tal que considera un conjunto de subprocesos y los ejectuta asincrónicamente.

Una instancia de `ThreadPoolExecutor` contiene el método `.map` el cual, al igual que la función `map` tiene como parámetros una función y una lista de iterables.

In [10]:
from concurrent.futures import ThreadPoolExecutor
def concurrent_extract_headers(list_urls):
    with ThreadPoolExecutor(max_workers=16) as executor:
            res = executor.map(extract_headers, list_urls)
    return res

In [11]:
%%time
headers = concurrent_extract_headers(urls)

https://scikit-learn.org/stable/ took 0.25 seconds
https://keras.io/ took 0.26 seconds
https://www.python.org/ took 0.38 seconds
https://docs.scipy.org/doc/numpy/index.html took 0.40 seconds
https://ocw.mit.edu/index.htm took 0.75 seconds
https://projecteuler.net/ took 0.97 seconds
https://nips.cc/ took 0.91 seconds
https://lexfridman.com/ took 0.89 seconds
https://distill.pub/ took 1.01 seconds
https://arxiv.org/ took 1.00 seconds
https://online.stanford.edu/ took 0.98 seconds
https://github.com took 1.10 seconds
https://math.stackexchange.com/ took 1.64 seconds
https://nabla.mx took 2.47 seconds
https://www.nature.com/ took 3.54 seconds
https://www.springer.com/gp/book/9780387310732 took 4.37 seconds
CPU times: user 332 ms, sys: 52.6 ms, total: 385 ms
Wall time: 4.47 s


Para este ejemplo, el programa con subprocesos asincrónicos tomó al rededor de 80% menos tiempo que la versión sincrónica

### El problema con `threads`
La más grande desventaja de trabajar con threads son problemas conocidos como _race conditions_

Otra de las limitantes de la librería `threads` es el uso intensivo del CPU a fin de poder cambiar tareas

The limit on total threads is the total CPU power minus the cost of task switches and synchronization overhead.

Using threading is ok as long as our i/o bound program is **not** CPU intensive. If we do not need 100% of all of the CPU power, using threads is a pass.

# Asyncio

Asyncio es una librería diseñanda para escribir código concurrente por medio del uso de la sintáxis `async` `await`.

Asyncio es la manera recomendada de correr procesos de tipo I/O dentro de Python dada su eficiente ejecución al momento de cambiar tareas, al contrario de _threading_, el cuál suele ser intensivo para el CPU.

In [4]:
import asyncio

### Corutinas y el _event loop_

Asyncio provee un mecanismo de ejecución llamado el **event loop** el cual se encarga de correr y coordinar las tareas tareas que se correran de manera concurrente.

Otro componente importante en un programa con asyncio es una corutina. Una **corutina** es un versión especializada de un generador de Python la cual puede suspender su ejecución antes de finalizar una función a fin de ceder el control del programa a otra corutina en otro momento. 

El event loop, entonces, se encarga de dos tareas principales:
1. Llevar un registro de las tareas que han terminado su ejecución, las tareas que aún esperan ser ejecutadas, las tareas que estan en espera de reanudar operación y la tarea que tiene el actual control.
2. Ceder control a tareas que esperan ser ejecutadas o a tareas que esperan reanudar su operación.

---

Nota: `asyncio` no funciona óptimamente en Jupyter notebooks, por lo que ejecutaremos scripts externos que nos permitan entender su uso.

---

### Definiendo corutinas

Dos _keywords_ escenciales para definir una corutina son:
* `async`
* `await`

La funcionaliad básica de **async** es informarle a Python que la función a definir será de tipo asincrónica, i.e., una corutina; por otro lado, **await** sede el control del programa.

Decimos que una función es de tipo _awaitable_ si podemos usarlo dentro de una expresión `await`, i.e, podemos llamar el código de tipo

```python
await smth
```

Es entonces, que la estructura básica para definir una corutina dentro de Python es de la siguiente manera:

```python
async def fn():
    # ... process
    await awaitable
```

### Ejecutando Corutinas

En código para Python 3.6 en adelante, la manera de ejecutar una corutina es mediante una llamada al _event loop_ para posteriormente ejectutar
```python
# Python 3.6+
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(corutine)
```

La manera [provisional](https://docs.python.org/3/glossary.html#term-provisional-api) de ejecutar una corutina en python es por medio de la función `run` dentro de la librería `asyncio`. 
```python
# Python 3.7+
asyncio.run(corutine)
```

<h2 style="color:teal">Ejemplo</h2>

Definiendo un primer programa con `asyncio`.

In [45]:
%%writefile ../files/lec08/first_async.py
import asyncio
async def greet():
    print("Hello")
    await asyncio.sleep(2) # Un proceso de tipo I/O
    print("World")

if __name__ == "__main":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(greet())

Overwriting ../files/lec08/first_async.py


In [46]:
%%time
!python ../files/lec08/first_async.py

CPU times: user 3.62 ms, sys: 6.17 ms, total: 9.78 ms
Wall time: 234 ms


El programa anterior tomó el mismo tiempo que le hubiese tomado a un programa sincrónico correr su ejecución. **¿A qué se deberá esto?**

<h2 style="color:teal">Ejemplo</h2>

Supongamos tenemos el siguiente programa el cuál imprime en la pantalla `"Hello"` seguido de `"World"`.

Supongamos de igual manerra que entre la primera y la segunda impresión a la pantalla existe un proceso de tipo _I/O_ el cuál tarda 2 segundos en terminar de completarse.

De necesitar tener que correr este proceso tres veces. Una manera sincrónica de escribir este programa sería de la siguiente manera:

In [43]:
%%writefile ../files/lec08/hello.py
import time

def greeting():
    print("Hello")
    # Proceso de tipo I/O que toma 2 segundos en regresar una respuesta
    time.sleep(2)
    print("AsyncIO")

def main():
    for _ in range(3):
        greeting()

if __name__ == "__main__":
    main()

Overwriting ../files/lec08/hello.py


In [44]:
%%time
!python ../files/lec08/hello.py

Hello
AsyncIO
Hello
AsyncIO
Hello
AsyncIO
CPU times: user 124 ms, sys: 43.5 ms, total: 167 ms
Wall time: 6.18 s


En caso de tener dos o más corutinas a ejecutar dentro de una corutina, programamos su ejecución concurrente creando un _task_ por medio de la función 

```python
# Python 3.6+
asyncio.ensure_future(cor())
```

```python
# Python 3.7+
asyncio.create_task(cor())
```

In [32]:
%%writefile ../files/lec08/hello_async0.py
import asyncio
async def greet():
    print("Hello")
    await asyncio.sleep(2) # Un proceso de tipo I/O
    print("World")


async def main():
    t1 = asyncio.ensure_future(greet()) # asyncio.create_task(cor()) 3.7+
    t2 = asyncio.ensure_future(greet()) # asyncio.create_task(cor()) 3.7+
    t3 = asyncio.ensure_future(greet()) # asyncio.create_task(cor()) 3.7+
    await t1
    await t2
    await t3
    
if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

Overwriting ../files/lec08/hello_async0.py


In [33]:
%%time
!python ../files/lec08/hello_async0.py

Hello
Hello
Hello
World
World
World
CPU times: user 44.7 ms, sys: 19.4 ms, total: 64.2 ms
Wall time: 2.24 s


Una manera alternativa de correr una seríe de corutinas hechas tareas dentro de una corutina es mediante la función

```python
asyncio.gather(cor0(), cor1(), ..., corn())
```

In [50]:
%%writefile ../files/lec08/hello_async1.py
import asyncio
async def greet():
    print("Hello")
    await asyncio.sleep(2) # Un proceso de tipo I/O
    print("World")

async def main():
    tasks = [greet(), greet(), greet()]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

Overwriting ../files/lec08/hello_async1.py


In [51]:
%%time
!python ../files/lec08/hello_async1.py

Hello
Hello
Hello
World
World
World
CPU times: user 44.8 ms, sys: 18.6 ms, total: 63.4 ms
Wall time: 2.23 s


# CPU Bound

Hasta ahora nos hemos enfocado en procesos de tipo _I/O_. Pero, ¿Qué sucede si nuestro problema requiere de un proceso tipo _CPU_?

Como mencionamos anteriormente, el _GIL_ prohibe a un proceso de Python correr una colección de subprocesos. La forma de eludir este problema dentro de un proceso de Python es mediante la librería `multiprocessing`.

<h2 style="color:teal">Ejemplo</h2>

Extrayendo el número de carácteres del código fuente desde una lista de páginas. (Necesario tener instalada la librería `aiohttp` para ejecutar este ejemplo)

In [None]:
!pip install aiohttp

In [115]:
%%writefile ../files/lec08/async_fetch.py
import asyncio
import aiohttp

async def get(sess, url):
    async with sess.get(url) as r:
        return await r.text()

async def main(urls):
    async with aiohttp.ClientSession() as sess:
        tasks = [get(sess, url) for url in urls]
        responses = await asyncio.gather(*tasks)
    print([len(web) for web in responses])
        
if __name__ == "__main__":
    urls = ['https://projecteuler.net/',
            'https://nabla.mx',
            'https://ocw.mit.edu/index.htm',
            'https://distill.pub/',
            'https://github.com',
            'https://www.python.org/',
            'https://nips.cc/',
            'https://scikit-learn.org/stable/',
            'https://arxiv.org/',
            'https://www.nature.com/',
            'https://www.springer.com/gp/book/9780387310732',
            'https://docs.scipy.org/doc/numpy/index.html',
            'https://online.stanford.edu/',
            'https://keras.io/',
            'https://math.stackexchange.com/',
            'https://lexfridman.com/']
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main(urls))

Overwriting ../files/lec08/async_fetch.py


In [116]:
%%time
!python ../files/lec08/async_fetch.py

[5194, 253691, 54877, 39473, 93492, 48433, 48345, 23735, 26543, 190614, 54551, 6612, 47573, 23812, 272144, 43367]
CPU times: user 44.1 ms, sys: 20.9 ms, total: 64.9 ms
Wall time: 2.22 s


# Multiprocessing

In [None]:
import multiprocessing

La manera en la que `multiprocessing` evade el GIL es creando nuevos procesos independientes del proceso actual.

## `multiprocessing.map`

La función `map` es una de las maneras más convenientes de introducir paralelísmo dentro de Python.

<h2 style="color:teal">Ejemplo</h2>

Consideremos la función `f(x)` dentro del siguiente programa que espera dos segundos antes de regresar el valor `x ** 2`. El programa `multiprocessing.py` tomará una lista de enteros y calculará el valor al cuadrado de cada uno de estos considerando un _pool_ con 4 procesos.

In [93]:
%%writefile ../files/lec08/multiprocessing0.py
from multiprocessing import Pool
from time import sleep

def f(x):
    sleep(2)
    return x ** 2

if __name__ == "__main__":
    import sys
    values = [int(v) for v in sys.argv[1:]]
    # Creamos una colección de 4 procesos a correr
    pool = Pool(processes=4)
    res = pool.map(f, values)
    print(res)

Overwriting ../files/lec08/multiprocessing0.py


Si corremos el programa con 4 elementos, se crean los 4 subprocesos y el programa tarda al rededor de dos segundos en correr.

In [94]:
%%time
!python ../files/lec08/multiprocessing0.py 1 2 3 4

[1, 4, 9, 16]
CPU times: user 49.9 ms, sys: 21.1 ms, total: 70.9 ms
Wall time: 2.25 s


Si introducimos un elemento adicional, el tiempo de respuesta se duplica debido al elemento adicional que necesitamos procesar, es decir, durante los primeros dos segundos, todos los procesos se encuentran ocupados y no es sino hasta que terminamos de correr el proceso que podemos considerar el siguiente elemento.

In [95]:
%%time
!python ../files/lec08/multiprocessing0.py 1 2 3 4 5

[1, 4, 9, 16, 25]
CPU times: user 92.4 ms, sys: 34.6 ms, total: 127 ms
Wall time: 4.31 s


Se considera una buena practica ocupar como número de procesos el número total de núcleos dentro de nuestra computadora.

In [84]:
import os
os.cpu_count()

8

<h2 style="color:crimson">Ejercicios</h2>

1. Menciona tres procesos de tipo _CPU_

---

2. Menciona tres procesos de tipo _I/O_

---

3. Usando `asyncio`, escribe la versión asincrónica de `extract_headers` y guardalo dentro del archivo `async_h1_extract.py` ¿Cuánto tiempo toma ejecutar el programa?

---

4. Usando multiprocessing, crea el archivo `mult_h1_extract.py` que extraiga los encabezados de `urls`. ¿Cuánto tiempo toma ejecutar el programa?

--- 

5. Dado los últimos dos ejercicios, ¿cuál consideras es una mejor manera de extraer los encabezados de una página? ¿por qué?

---

6. Se quiere escribir un programa que ejecute procesos `A1`, `A2`, `A3`, `B` y `C` de la siguiente manera:

```
+-----+
| A1  +---------+
+-----+         |
                |
+-----+      +--v--+      +-----+
| A2  +----->+  B  +----->+  C  |
+-----+      +--^--+      +-----+
                |
+-----+         |
| A3  +---------+
+-----+
```

Describe en tus palabras cómo usarías herramientas de concurrencia para ejecutar el proceso de manera óptima.

## Referencias
1. https://pybay.com/site_media/slides/raymond2017-keynote/index.html
2. https://asyncio.readthedocs.io/en/latest/hello_world.html
3. https://www.youtube.com/watch?v=KVKufdTphKs
4. https://realpython.com/python-concurrency/
5. https://stackoverflow.com/a/38145183/11278455
6. https://docs.python.org/3/library/concurrent.futures.html
7. https://docs.python.org/3.6/library/multiprocessing.html?highlight=multithreading