# 4.2 - Procesos paralelos


![parallel](images/parallel.png)

$$$$

### Multiprocessing

Veamos en primer lugar [multiprocessing](https://docs.python.org/es/3.9/library/multiprocessing.html). Es una librería de Python que nos permite manejar hilos y procesos. La diferencia entre hilo y proceso es que un hilo ocurre dentro del espacio de memoria de un programa y un proceso es una copia completa del programa, por esta razón, los hilos son rápidos de crear y destruir además de que consumen poca memoria y los procesos son lentos de crear y destruir además de que requieren clonar el espacio de memoria del programa en otro lugar de la RAM, y esto es lento. Ejemplos de esto serían, subrutinas que recogen mensajes de un puerto de comunicaciones y los usan para actuar sobre emails almacenados en un servidor, desde el punto de vista del servidor, el cliente de correo sólo necesita usar el servidor durante un corto plazo de tiempo, porque envía un mensaje al servidor donde le indica lo que el usuario desea hacer, saber si hay mensajes nuevos, borrar un correo, moverlo... El servidor abre un hilo para atender a ese usuario y el hilo sólo vive mientras dure la conexión del usuario, una vez el usuario ha terminado el cliente de correo desconecta hasta nueva acción. Este proceso que he descrito es rápido, ocurre en milisegundos y generalmente se resuelve con hilos porque es más ligero para el sistema operativo y su vida media es especialmente corta, además de que el sistema podrá aceptar ciento o miles de conexiones por segundo y será ligero, rápido y eficiente en esta tarea.

La tendencia actual entre los desarrolladores es hacer una aplicaciones que sean rápidas en un sólo hilo y luego escalar a tantas instancias como sea necesario para cubrir nuestros objetivos de aprovechamiento, estos servidores pueden atender en un sólo proceso a miles o decena de miles de conexiones.

Si queremos realizar un programa que aproveche las diferentes CPUs y pueda realizar múltiples tareas a la vez tenemos muchos mecanismos para llevar esta tarea a cabo. Dependiendo del uso que se quiera dar probablemente queramos usar hilos o procesos, es aquí donde querremos escribir nuestro código con hilos o procesos.

**Hola Mundo**

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
def cuadrado(x):
    return x**2

In [None]:
datos=[i for i in range(100000000)]

In [None]:
%%time

seq=[cuadrado(x) for x in datos]

seq[:10]

In [None]:
%%time

map(cuadrado, datos)

In [None]:
import multiprocessing as mp

In [None]:
mp.cpu_count()

In [None]:
mp.get_start_method()   # movida del mac M1, para otros no hace falta

In [None]:
# movida del mac M1, para otros no hace falta

from multiprocessing import get_context

In [None]:
%%time

pool=get_context('fork').Pool(mp.cpu_count())  # grupo con todos los cores, si es otro pc seria mp.Pool(mp.cpu_count())

res=pool.map(cuadrado, datos)
pool.close()

res[:10]

In [None]:
pool

**multiprocessing asíncrono**

`map` consume su iterable convirtiendo el iterable en una lista, dividiéndolo en fragmentos y enviando esos fragmentos a los procesos de trabajo en el Pool. Dividir el iterable en fragmentos funciona mejor que pasar cada elemento en el iterable entre procesos un elemento a la vez, especialmente si el iterable es grande. Sin embargo, convertir el iterable en una lista para dividirlo puede tener un costo de memoria muy alto, ya que la lista completa deberá mantenerse en la memoria.

`imap`/`map_async` no convierte el iterable que le da en una lista, ni lo divide en trozos. Itera sobre el elemento de uno en uno y los envia a un proceso de trabajo distinto. Esto significa que no se toma el golpe de memoria de convertir todo el iterable en una lista, pero también que el rendimiento es más lento para los iterables grandes, debido a la falta de fragmentación. Esto se puede mitigar aumentando el valor predeterminado de 1 en el `chunksize`. Otra gran diferencia de `imap` es que puede comenzar a recibir resultados de los trabajadores tan pronto como estén listos, en lugar de tener que esperar a que todos terminen. 




In [None]:
%%time
pool=get_context('fork').Pool(mp.cpu_count())   

res=pool.map_async(cuadrado, datos).get()
pool.close()
res[:10]

In [None]:
%%time
pool=get_context('fork').Pool(mp.cpu_count())   

res=pool.imap(cuadrado, datos)
pool.close()
res

```python
%%time
pool=mp.Pool(mp.cpu_count())   

for x in pool.imap(cuadrado, datos):
    print(x)
    
pool.close()
```

$$$$

$$$$

## Joblib

![joblib](images/joblib.svg)

$$$$

$$$$


[Joblib](https://joblib.readthedocs.io/en/latest/) es una librería de Python que también nos permite paralelizar un programa. En este caso a través de procesos, lo cuál implica, como vimos antes, cierto tiempo para construir el Pool. Lo usaremos principalmente para realizar un bucle sobre una función.

Veamos el Hola Mundo.

**Hola Mundo**

In [None]:
%pip install joblib

In [3]:
from joblib import Parallel, delayed

In [None]:
%%time

funcion=lambda x: x**0.5  # funcion raiz cuadrada

Parallel(n_jobs=4, verbose=True)(delayed(funcion)(i**2) for i in range(10))

### Ejemplo ESPN

Volvamos de nuevo al ejemplo de scrapeo de la págine de ESPN. Usaremos joblib para realizar una extracción en paralelo de la información.

In [4]:
from selenium import webdriver

import time

import pandas as pd

from webdriver_manager.chrome import ChromeDriverManager

PATH=ChromeDriverManager().install()



Current google-chrome version is 100.0.4896
Get LATEST chromedriver version for 100.0.4896 google-chrome
Driver [/Users/iudh/.wdm/drivers/chromedriver/mac64_m1/100.0.4896.60/chromedriver] found in cache


In [5]:
url='https://www.espn.com/soccer/competitions'


driver=webdriver.Chrome(PATH)
driver.get(url)
    
time.sleep(2)

aceptar=driver.find_element_by_xpath('//*[@id="onetrust-accept-btn-handler"]')
aceptar.click()

time.sleep(2)

equipos=driver.find_element_by_css_selector('#fittPageContainer > div.page-container.cf > div > div.layout__column.layout__column--1 > div > div:nth-child(3) > div:nth-child(1) > div > div:nth-child(5) > div > section > div > div > span:nth-child(2) > a')
equipos.click()

time.sleep(2)

equipos_stats_urls=driver.find_elements_by_css_selector('a.AnchorLink')

equipos_stats_urls=[e.get_attribute('href') for e in equipos_stats_urls 
                    if 'team/stats' in e.get_attribute('href')]


equipos_stats_urls


['https://www.espn.com/soccer/team/stats/_/id/103/ac-milan',
 'https://www.espn.com/soccer/team/stats/_/id/104/as-roma',
 'https://www.espn.com/soccer/team/stats/_/id/105/atalanta',
 'https://www.espn.com/soccer/team/stats/_/id/107/bologna',
 'https://www.espn.com/soccer/team/stats/_/id/2925/cagliari',
 'https://www.espn.com/soccer/team/stats/_/id/2574/empoli',
 'https://www.espn.com/soccer/team/stats/_/id/109/fiorentina',
 'https://www.espn.com/soccer/team/stats/_/id/3263/genoa',
 'https://www.espn.com/soccer/team/stats/_/id/119/hellas-verona',
 'https://www.espn.com/soccer/team/stats/_/id/110/internazionale',
 'https://www.espn.com/soccer/team/stats/_/id/111/juventus',
 'https://www.espn.com/soccer/team/stats/_/id/112/lazio',
 'https://www.espn.com/soccer/team/stats/_/id/114/napoli',
 'https://www.espn.com/soccer/team/stats/_/id/3240/salernitana',
 'https://www.espn.com/soccer/team/stats/_/id/2734/sampdoria',
 'https://www.espn.com/soccer/team/stats/_/id/3997/sassuolo',
 'https://www

In [6]:
driver.quit()

In [7]:
def extraer(url):
    
    driver=webdriver.Chrome(PATH)
    driver.get(url)

    time.sleep(2)

    aceptar=driver.find_element_by_xpath('//*[@id="onetrust-accept-btn-handler"]')
    aceptar.click()

    time.sleep(2)

    disciplina=driver.find_element_by_xpath('//*[@id="fittPageContainer"]/div[2]/div[5]/div/div/section/div/div[2]/nav/ul/li[2]/a')
    disciplina.click()

    time.sleep(1)

    tabla=driver.find_element_by_tag_name('tbody')

    filas=tabla.find_elements_by_tag_name('tr')

    datos=[[e.text for e in f.find_elements_by_tag_name('td')]+[url.split('/')[-1]] for f in filas]
    
    

    cabeceras=driver.find_element_by_tag_name('thead')

    cabeceras=[c.text for c in cabeceras.find_elements_by_tag_name('th')]+['TEAM']
    
    driver.quit()
    
    return pd.DataFrame(datos, columns=cabeceras)

In [10]:
%%time

lst_df=Parallel(n_jobs=8, verbose=True)(delayed(extraer)(url) for url in equipos_stats_urls[:2])

[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.


CPU times: user 20 ms, sys: 42.5 ms, total: 62.4 ms
Wall time: 14.1 s


[Parallel(n_jobs=8)]: Done   2 out of   2 | elapsed:   14.1s remaining:    0.0s
[Parallel(n_jobs=8)]: Done   2 out of   2 | elapsed:   14.1s finished


In [12]:
lst_df[0].head()

Unnamed: 0,RK,NAME,P,YC,RC,PTS,TEAM
0,1,Theo Hernández,26,6,2,12,ac-milan
1,2,Alessio Romagnoli,18,4,1,7,ac-milan
2,3,Sandro Tonali,30,6,0,6,ac-milan
3,4,Ismaël Bennacer,26,5,0,5,ac-milan
4,5,Brahim Díaz,27,4,0,4,ac-milan


**Tip:** https://pypi.org/project/tqdm/

In [13]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [14]:
from tqdm.notebook import tqdm  # from tqdm import tqdm para .py

In [15]:
lst=[1, 23, 4, 5]

for e in tqdm(lst):
    print(e**2)

  0%|          | 0/4 [00:00<?, ?it/s]

1
529
16
25


In [20]:
%%time

funcion=lambda x: x**0.5  # funcion raiz cuadrada

Parallel(n_jobs=4, verbose=True)(delayed(funcion)(i**2) for i in tqdm(range(10)))

  0%|          | 0/10 [00:00<?, ?it/s]

CPU times: user 31.2 ms, sys: 4.54 ms, total: 35.7 ms
Wall time: 33 ms


[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done  10 out of  10 | elapsed:    0.0s finished


[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]