# 4.2 - Procesos paralelos


![parallel](images/parallel.png)

$$$$

### Multiprocessing

Veamos en primer lugar [multiprocessing](https://docs.python.org/es/3.9/library/multiprocessing.html). Es una librería de Python que nos permite manejar hilos y procesos. La diferencia entre hilo y proceso es que un hilo ocurre dentro del espacio de memoria de un programa y un proceso es una copia completa del programa, por esta razón, los hilos son rápidos de crear y destruir además de que consumen poca memoria y los procesos son lentos de crear y destruir además de que requieren clonar el espacio de memoria del programa en otro lugar de la RAM, y esto es lento. Ejemplos de esto serían, subrutinas que recogen mensajes de un puerto de comunicaciones y los usan para actuar sobre emails almacenados en un servidor, desde el punto de vista del servidor, el cliente de correo sólo necesita usar el servidor durante un corto plazo de tiempo, porque envía un mensaje al servidor donde le indica lo que el usuario desea hacer, saber si hay mensajes nuevos, borrar un correo, moverlo... El servidor abre un hilo para atender a ese usuario y el hilo sólo vive mientras dure la conexión del usuario, una vez el usuario ha terminado el cliente de correo desconecta hasta nueva acción. Este proceso que he descrito es rápido, ocurre en milisegundos y generalmente se resuelve con hilos porque es más ligero para el sistema operativo y su vida media es especialmente corta, además de que el sistema podrá aceptar ciento o miles de conexiones por segundo y será ligero, rápido y eficiente en esta tarea.

La tendencia actual entre los desarrolladores es hacer una aplicaciones que sean rápidas en un sólo hilo y luego escalar a tantas instancias como sea necesario para cubrir nuestros objetivos de aprovechamiento, estos servidores pueden atender en un sólo proceso a miles o decena de miles de conexiones.

Si queremos realizar un programa que aproveche las diferentes CPUs y pueda realizar múltiples tareas a la vez tenemos muchos mecanismos para llevar esta tarea a cabo. Dependiendo del uso que se quiera dar probablemente queramos usar hilos o procesos, es aquí donde querremos escribir nuestro código con hilos o procesos.

**Hola Mundo**

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
def cuadrado(x):
    return x**2

In [3]:
datos=[i for i in range(10000000)]

In [4]:
%%time

seq=[cuadrado(x) for x in datos]

seq[:10]

CPU times: user 2.37 s, sys: 57.5 ms, total: 2.43 s
Wall time: 2.43 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [5]:
%%time

map(cuadrado, datos)

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 7.87 µs


<map at 0x16926e280>

In [6]:
%%time

a=list(map(cuadrado, datos))

CPU times: user 2.13 s, sys: 42.4 ms, total: 2.17 s
Wall time: 2.17 s


In [7]:
import multiprocessing as mp

In [8]:
mp.get_start_method()   # movida del Mac M1, para otros no hace falta

'spawn'

In [9]:
# movida del mac M1, para otros no hace falta

from multiprocessing import get_context

In [10]:
mp.cpu_count()

8

In [14]:
%%time

#pool=mp.Pool(mp.cpu_count())   # grupo con todos los cores
pool=get_context('fork').Pool(mp.cpu_count())  # esto para el Mac M1


res=pool.map(cuadrado, datos)
pool.close()

res[:10]

CPU times: user 605 ms, sys: 332 ms, total: 937 ms
Wall time: 969 ms


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [15]:
pool

<multiprocessing.pool.Pool state=CLOSE pool_size=8>

**multiprocessing asíncrono**

`map` consume su iterable convirtiendo el iterable en una lista, dividiéndolo en fragmentos y enviando esos fragmentos a los procesos de trabajo en el Pool. Dividir el iterable en fragmentos funciona mejor que pasar cada elemento en el iterable entre procesos un elemento a la vez, especialmente si el iterable es grande. Sin embargo, convertir el iterable en una lista para dividirlo puede tener un costo de memoria muy alto, ya que la lista completa deberá mantenerse en la memoria.

`imap`/`map_async` no convierte el iterable que le da en una lista, ni lo divide en trozos. Itera sobre el elemento de uno en uno y los envia a un proceso de trabajo distinto. Esto significa que no se toma el golpe de memoria de convertir todo el iterable en una lista, pero también que el rendimiento es más lento para los iterables grandes, debido a la falta de fragmentación. Esto se puede mitigar aumentando el valor predeterminado de 1 en el `chunksize`. Otra gran diferencia de `imap` es que puede comenzar a recibir resultados de los trabajadores tan pronto como estén listos, en lugar de tener que esperar a que todos terminen. 




In [16]:
%%time

#pool=mp.Pool(mp.cpu_count())
pool=get_context('fork').Pool(6)  # grupo con 6 cores

res=pool.map_async(cuadrado, datos).get()
pool.close()

res[:10]

CPU times: user 554 ms, sys: 380 ms, total: 934 ms
Wall time: 1 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

```python
%%time
pool=mp.Pool(mp.cpu_count())   

for x in pool.imap(cuadrado, datos):
    print(x)
    
pool.close()
```

$$$$

$$$$

## Joblib

![joblib](images/joblib.svg)

$$$$

$$$$


[Joblib](https://joblib.readthedocs.io/en/latest/) es una librería de Python que también nos permite paralelizar un programa. En este caso a través de procesos, lo cuál implica, como vimos antes, cierto tiempo para construir el Pool. Lo usaremos principalmente para realizar un bucle sobre una función.

Veamos el Hola Mundo.

**Hola Mundo**

In [17]:
%pip install joblib

Note: you may need to restart the kernel to use updated packages.


In [18]:
from joblib import Parallel, delayed

In [19]:
%%time

lst=Parallel(n_jobs=6, verbose=True)(delayed(cuadrado)(e) for e in datos)

lst[:5]  # tarda más por el reordenado los datos, coloca en orden de la que calcula

[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done  46 tasks      | elapsed:    0.5s
[Parallel(n_jobs=6)]: Done 229382 tasks      | elapsed:    2.2s
[Parallel(n_jobs=6)]: Done 4325382 tasks      | elapsed:   12.8s


CPU times: user 26.5 s, sys: 933 ms, total: 27.4 s
Wall time: 27.7 s


[Parallel(n_jobs=6)]: Done 9907822 tasks      | elapsed:   27.7s
[Parallel(n_jobs=6)]: Done 10000000 out of 10000000 | elapsed:   27.7s finished


[0, 1, 4, 9, 16]

### Ejemplo ESPN

Volvamos de nuevo al ejemplo de scrapeo de la págine de ESPN. Usaremos joblib para realizar una extracción en paralelo de la información.

In [20]:
from selenium import webdriver

import time

import pandas as pd

from webdriver_manager.chrome import ChromeDriverManager

PATH=ChromeDriverManager().install()



Current google-chrome version is 105.0.5195
Get LATEST chromedriver version for 105.0.5195 google-chrome
Driver [/Users/iudh/.wdm/drivers/chromedriver/mac64_m1/105.0.5195.52/chromedriver] found in cache


In [21]:
url='https://www.espn.com/soccer/competitions'


driver=webdriver.Chrome(PATH)
driver.get(url)
    
time.sleep(2)

aceptar=driver.find_element_by_xpath('//*[@id="onetrust-accept-btn-handler"]')
aceptar.click()

time.sleep(4)

equipos=driver.find_element_by_css_selector('#fittPageContainer > div.page-container.cf > div > div.layout__column.layout__column--1 > div > div:nth-child(3) > div:nth-child(1) > div > div:nth-child(5) > div > section > div > div > span:nth-child(2) > a')
equipos.click()


time.sleep(2)

equipos_stats_urls=driver.find_elements_by_css_selector('a.AnchorLink')

equipos_stats_urls=[e.get_attribute('href') for e in equipos_stats_urls 
                    if 'team/stats' in e.get_attribute('href')]


equipos_stats_urls


['https://www.espn.com/soccer/team/stats/_/id/598/1-fc-union-berlin',
 'https://www.espn.com/soccer/team/stats/_/id/131/bayer-leverkusen',
 'https://www.espn.com/soccer/team/stats/_/id/132/bayern-munich',
 'https://www.espn.com/soccer/team/stats/_/id/124/borussia-dortmund',
 'https://www.espn.com/soccer/team/stats/_/id/268/borussia-monchengladbach',
 'https://www.espn.com/soccer/team/stats/_/id/125/eintracht-frankfurt',
 'https://www.espn.com/soccer/team/stats/_/id/3841/fc-augsburg',
 'https://www.espn.com/soccer/team/stats/_/id/122/fc-cologne',
 'https://www.espn.com/soccer/team/stats/_/id/129/hertha-berlin',
 'https://www.espn.com/soccer/team/stats/_/id/2950/mainz',
 'https://www.espn.com/soccer/team/stats/_/id/11420/rb-leipzig',
 'https://www.espn.com/soccer/team/stats/_/id/126/sc-freiburg',
 'https://www.espn.com/soccer/team/stats/_/id/133/schalke-04',
 'https://www.espn.com/soccer/team/stats/_/id/7911/tsg-hoffenheim',
 'https://www.espn.com/soccer/team/stats/_/id/134/vfb-stuttgart

In [22]:
driver.quit()

In [23]:
def extraer(url):

    
    # inicia el driver
    driver=webdriver.Chrome(PATH)
    driver.get(url)

    time.sleep(2)

    # acepta cookies
    aceptar=driver.find_element_by_xpath('//*[@id="onetrust-accept-btn-handler"]')
    aceptar.click()

    time.sleep(2)
    
    # disciplina
    dis=driver.find_element_by_xpath('//*[@id="fittPageContainer"]/div[2]/div[5]/div/div[1]/section/div/div[2]/nav/ul/li[2]/a')
    dis.click()

    time.sleep(2)
    
    tabla=driver.find_element_by_tag_name('tbody')

    filas=tabla.find_elements_by_tag_name('tr')


    data=[]

    for f in filas:

        elementos=f.find_elements_by_tag_name('td') 

        tmp=[]

        for e in elementos:

            tmp.append(e.text)
            
        tmp.append(url.split('/')[-1])
        data.append(tmp)
        

    cabeceras=driver.find_element_by_tag_name('thead')

    cabeceras=[c.text for c in cabeceras.find_elements_by_tag_name('th')]+['TEAM']
    
    
    driver.quit()
    
    return pd.DataFrame(data, columns=cabeceras)

In [24]:
%%time

lst_df=Parallel(n_jobs=6, verbose=True)(delayed(extraer)(url) for url in equipos_stats_urls)

[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.








CPU times: user 98.1 ms, sys: 38.2 ms, total: 136 ms
Wall time: 48.9 s


[Parallel(n_jobs=6)]: Done  18 out of  18 | elapsed:   48.9s finished


In [25]:
len(lst_df)

18

In [26]:
len(equipos_stats_urls)

18

In [27]:
lst_df[0].head()

Unnamed: 0,RK,NAME,P,YC,RC,PTS,TEAM
0,1.0,Robin Knoche,4,1,0,1,1-fc-union-berlin
1,,Frederik Ronnow,4,1,0,1,1-fc-union-berlin
2,3.0,Sheraldo Becker,4,0,0,0,1-fc-union-berlin
3,,András Schäfer,3,0,0,0,1-fc-union-berlin
4,,Milos Pantovic,2,0,0,0,1-fc-union-berlin


**Tip:** https://pypi.org/project/tqdm/

In [28]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [29]:
from tqdm.notebook import tqdm    # from tqdm import tqdm   # para .py

In [31]:
lst=[1, 23, 45, 65, 6778, 8756, 32, 45, 67, 89, 99, 9990]

for e in lst:
    print(e**2)

1
529
2025
4225
45941284
76667536
1024
2025
4489
7921
9801
99800100


In [32]:
for e in tqdm(lst):
    print(e**2)

  0%|          | 0/12 [00:00<?, ?it/s]

1
529
2025
4225
45941284
76667536
1024
2025
4489
7921
9801
99800100


In [33]:
%%time

funcion=lambda x: x**0.5   # raiz cuadrada

lst=Parallel(n_jobs=6, verbose=True)(delayed(funcion)(i**2) for i in tqdm(range(1000000)))

lst[:10]

  0%|          | 0/1000000 [00:00<?, ?it/s]

[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done 100 tasks      | elapsed:    0.0s
[Parallel(n_jobs=6)]: Done 524276 tasks      | elapsed:    2.7s


CPU times: user 3.24 s, sys: 309 ms, total: 3.54 s
Wall time: 3.53 s


[Parallel(n_jobs=6)]: Done 1000000 out of 1000000 | elapsed:    3.5s finished


[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]