# 4.2 - Procesos paralelos


![parallel](images/parallel.png)

$$$$

### Multiprocessing

Veamos en primer lugar [multiprocessing](https://docs.python.org/es/3.9/library/multiprocessing.html). Es una librería de Python que nos permite manejar hilos y procesos. La diferencia entre hilo y proceso es que un hilo ocurre dentro del espacio de memoria de un programa y un proceso es una copia completa del programa, por esta razón, los hilos son rápidos de crear y destruir además de que consumen poca memoria y los procesos son lentos de crear y destruir además de que requieren clonar el espacio de memoria del programa en otro lugar de la RAM, y esto es lento. Ejemplos de esto serían, subrutinas que recogen mensajes de un puerto de comunicaciones y los usan para actuar sobre emails almacenados en un servidor, desde el punto de vista del servidor, el cliente de correo sólo necesita usar el servidor durante un corto plazo de tiempo, porque envía un mensaje al servidor donde le indica lo que el usuario desea hacer, saber si hay mensajes nuevos, borrar un correo, moverlo... El servidor abre un hilo para atender a ese usuario y el hilo sólo vive mientras dure la conexión del usuario, una vez el usuario ha terminado el cliente de correo desconecta hasta nueva acción. Este proceso que he descrito es rápido, ocurre en milisegundos y generalmente se resuelve con hilos porque es más ligero para el sistema operativo y su vida media es especialmente corta, además de que el sistema podrá aceptar ciento o miles de conexiones por segundo y será ligero, rápido y eficiente en esta tarea.

La tendencia actual entre los desarrolladores es hacer una aplicaciones que sean rápidas en un sólo hilo y luego escalar a tantas instancias como sea necesario para cubrir nuestros objetivos de aprovechamiento, estos servidores pueden atender en un sólo proceso a miles o decena de miles de conexiones.

Si queremos realizar un programa que aproveche las diferentes CPUs y pueda realizar múltiples tareas a la vez tenemos muchos mecanismos para llevar esta tarea a cabo. Dependiendo del uso que se quiera dar probablemente queramos usar hilos o procesos, es aquí donde querremos escribir nuestro código con hilos o procesos.

**Hola Mundo**

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
def cuadrado(x):
    return x**2

In [None]:
data = [i for i in range(10_000_000)]

In [None]:
%%time

seq = [cuadrado(x) for x in data]

seq[:5]

In [None]:
%%time

map(cuadrado, data)

In [None]:
%%time

seq=list(map(cuadrado, data))

seq[:5]

In [None]:
import multiprocessing as mp

In [None]:
# movida del Mac M1

mp.get_start_method()

In [None]:
# movida del mac M1, para otros no hace falta

from multiprocessing import get_context

In [None]:
mp.cpu_count()    # nº de nucleos

In [None]:
%%time

#pool = mp.Pool(mp.cpu_count())    # usar todos los nucleos (Intel o AMD)

pool = get_context('fork').Pool(mp.cpu_count())  # ARM M1  (en vez de ir de 1en1 va de 8en8)
 
seq = pool.map(cuadrado, data)

pool.close()

seq[:5]

**multiprocessing asíncrono**

`map` consume su iterable convirtiendo el iterable en una lista, dividiéndolo en fragmentos y enviando esos fragmentos a los procesos de trabajo en el Pool. Dividir el iterable en fragmentos funciona mejor que pasar cada elemento en el iterable entre procesos un elemento a la vez, especialmente si el iterable es grande. Sin embargo, convertir el iterable en una lista para dividirlo puede tener un costo de memoria muy alto, ya que la lista completa deberá mantenerse en la memoria.

`imap`/`map_async` no convierte el iterable que le da en una lista, ni lo divide en trozos. Itera sobre el elemento de uno en uno y los envia a un proceso de trabajo distinto. Esto significa que no se toma el golpe de memoria de convertir todo el iterable en una lista, pero también que el rendimiento es más lento para los iterables grandes, debido a la falta de fragmentación. Esto se puede mitigar aumentando el valor predeterminado de 1 en el `chunksize`. Otra gran diferencia de `imap` es que puede comenzar a recibir resultados de los trabajadores tan pronto como estén listos, en lugar de tener que esperar a que todos terminen. 




In [None]:
%%time

#pool=mp.Pool(mp.cpu_count())

pool=get_context('fork').Pool(6)  # grupo con 6 cores

res=pool.map_async(cuadrado, data).get()

pool.close()

res[:5]

```python
%%time
pool=mp.Pool(mp.cpu_count())   

for x in pool.imap(cuadrado, datos):
    print(x)
    
pool.close()
```

$$$$

$$$$

## Joblib

![joblib](images/joblib.svg)

$$$$

$$$$


[Joblib](https://joblib.readthedocs.io/en/latest/) es una librería de Python que también nos permite paralelizar un programa. En este caso a través de procesos, lo cuál implica, como vimos antes, cierto tiempo para construir el Pool. Lo usaremos principalmente para realizar un bucle sobre una función.

Veamos el Hola Mundo.

**Hola Mundo**

In [None]:
%pip install joblib

In [None]:
from joblib import Parallel, delayed

In [None]:
%%time

paralelo = Parallel(n_jobs=-1,   # n_jobs es el nº de nuncleos que usas, -1 = todos
                    verbose=True
                   )


seq = paralelo(delayed(cuadrado)(e) for e in data)

seq[:5]

### Ejemplo ESPN

Volvamos de nuevo al ejemplo de scrapeo de la págine de ESPN. Usaremos joblib para realizar una extracción en paralelo de la información.

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select

import time

import pandas as pd

PATH='driver/chromedriver'

In [None]:
url='https://www.espn.com/soccer/competitions'


driver=webdriver.Chrome(PATH)
driver.get(url)
    
time.sleep(2)

aceptar=driver.find_element(By.XPATH, '//*[@id="onetrust-accept-btn-handler"]')
aceptar.click()

time.sleep(4)

equipos=driver.find_element(By.CSS_SELECTOR, '#fittPageContainer > div.page-container.cf > div > div.layout__column.layout__column--1 > div > div:nth-child(3) > div:nth-child(1) > div > div:nth-child(5) > div > section > div > div > span:nth-child(2) > a')
equipos.click()


time.sleep(2)

equipos_stats_urls=driver.find_elements(By.CSS_SELECTOR, 'a.AnchorLink')

equipos_stats_urls=[e.get_attribute('href') for e in equipos_stats_urls 
                    if 'team/stats' in e.get_attribute('href')]


equipos_stats_urls[:20]

In [None]:
driver.quit()

In [None]:
ERRORES = []

In [None]:
def extraer(url):

    # inicia el driver
    driver=webdriver.Chrome(PATH)
    driver.get(url)

    time.sleep(2)

    # acepta cookies
    aceptar=driver.find_element(By.XPATH, '//*[@id="onetrust-accept-btn-handler"]')
    aceptar.click()

    time.sleep(2)
    
    data=[]
    cabeceras=[]
    
    try:
        # dropdown
        dropdown = driver.find_element(By.XPATH, '//*[@id="fittPageContainer"]/div[2]/div[5]/div/div/section/div/div[4]/select[1]')
        select = Select(dropdown)
        try:
            select.select_by_visible_text('2022-23')
        except:
            select.select_by_visible_text('2023-24')



        time.sleep(1)

        # disciplina
        dis=driver.find_element(By.XPATH, '//*[@id="fittPageContainer"]/div[2]/div[5]/div/div[1]/section/div/div[2]/nav/ul/li[2]/a')
        dis.click()

        time.sleep(2)

        tabla=driver.find_element(By.TAG_NAME, 'tbody')

        filas=tabla.find_elements(By.TAG_NAME, 'tr')

        for f in filas:

            elementos=f.find_elements(By.TAG_NAME, 'td') 

            tmp=[]

            for e in elementos:

                tmp.append(e.text)

            tmp.append(url.split('/')[-1])  # nombre del equipo
            data.append(tmp)


        cabeceras=driver.find_element(By.TAG_NAME, 'thead')

        cabeceras=[c.text for c in cabeceras.find_elements(By.TAG_NAME, 'th')]+['TEAM']

    except:
        ERRORES.append(url.split('/')[-1])
        time.sleep(0.1)
        
    driver.quit()
    
    return pd.DataFrame(data, columns=cabeceras)

In [None]:
paralelo = Parallel(n_jobs=6, verbose=True)


lst_df = paralelo(delayed(extraer)(url) for url in equipos_stats_urls[:10])


In [None]:
lst_df = lst_df[1:]

In [None]:
df = pd.concat(lst_df)

In [None]:
df.shape

In [None]:
df.TEAM.unique()

In [None]:
len(df.TEAM.unique())

In [None]:
ERRORES

**Tip:** https://pypi.org/project/tqdm/

**Para barras de progreso**

In [None]:
%pip install tqdm

In [None]:
from tqdm.notebook import tqdm

In [None]:
paralelo = Parallel(n_jobs=6, verbose=False)


lst_df = paralelo(delayed(extraer)(url) for url in tqdm(equipos_stats_urls[:10]))


df = pd.concat(lst_df)


df.shape

In [None]:
len(df.TEAM.unique())