# Calcolo parallelo

# Indice

1. [Thread e processi](#thread_processi) <br>
2. [Web scraping](#scraping)<br>
3. [Programmazione sequenziale](#sequenziale)<br>
4. [Programmazione concorrente - *multithreading*](#multithreading)<br>
    3.1 [La classe `Thread`](#thread)<br>
    3.2 [Confrontare i tempi di esecuzione](#tempi)<br>
5. [Programmazione concorrente - *multiprocessing*](#multiprocessing)<br>
    4.1 [La classe `Process`](#process)<br>
    4.2 [La classe `Pool`](#pool)<br>
6. [Scegliere gli iperparametri ottimali](#iperparametri)<br>

In [1]:
import inspect
import multiprocessing as mp
import numpy as np
import os
import pandas as pd
import threading
import time

%load_ext autoreload
%autoreload 2

# 1. Thread e processi <a id=thread_processi> </a>

Il **processo** è l'istanza di esecuzione di un'applicazione. Ad esempio, quando eseguiamo `jupyter notebook` da terminale, stiamo avviando un processo. Ciascun processo risiede in aree di memoria differenti e, per comunicare tra loro, devono utilizzare canali come *file, pipe, code o socket*.

Ciascun processo può contenere più **thread**, di cui uno viene chiamato *thread primario*. I thread afferenti ad uno stesso processo condividono la medesima area di memoria, possono leggere e scrivere le stesse variabili e di conseguenza possono interferire l'uno con l'altro. 

Tipicamente, se si vuole parallelizzare l'esecuzione di un problema, si utilizza il *multithreading*, per permettere la cooperazione dei singoli thread nella stessa area di memoria. L'esecuzione *multithreading* generalmente garantisce performance migliori rispetto al *multiprocessing* (area di memoria condivisa e non duplicata, comunicazione inter-thread più veloce), tuttavia bisogna stare attenti a come i singoli *thread* accedono alle variabili, per evitare sovrascritture.

# 2. Web scraping <a id=scraping> </a>

In [2]:
# caricamento delle librerie per il web scraping
import requests
from bs4 import BeautifulSoup

### Estrarre il contenuto di una pagina

In [3]:
url = "https://www.didattica.unipd.it/off/2016/LT/SC/SC2094/000ZZ/SCP4063754/N0"

id_processo = os.getpid()
nome_processo = mp.current_process().name
nome_thread = threading.current_thread().name
print("[INIZIO]\nURL: {}\nID processo: {}\nNome processo: {}\nNome"
        "thread: {}\n".format(url, id_processo, nome_processo, nome_thread))

risposta = requests.get(url)
contenuto = BeautifulSoup(risposta.content, "lxml").get_text()

[INIZIO]
URL: https://www.didattica.unipd.it/off/2016/LT/SC/SC2094/000ZZ/SCP4063754/N0
ID processo: 2912
Nome processo: MainProcess
Nomethread: MainThread



In [4]:
N = 5
righe_non_vuote = [c for c in contenuto.split("\n") if c.strip()]

print("Prime {} righe non vuote:".format(N))
for riga in righe_non_vuote[:N]:
    print("{}".format(riga))


Prime 5 righe non vuote:
Didattica - Università degli Studi di Padova
Vai alla navigazione
Unipd.it
Rubrica
IT


# 2. Programmazione sequenziale <a id=sequenziale> </a>

In [5]:
URLS = [
    "https://www.didattica.unipd.it/off/2016/LT/SC/SC2094/000ZZ/SCP4063754/N0",
    "https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python",
    "https://docs.python.org/3.6/library/threading.html",
    "https://docs.python.org/3.6/library/multiprocessing.html",
]

Con il comando successivo, estraiamo il contenuto delle URL inserite nella lista precedente e ne cronometriamo il tempo di esecuzione (utilizzando un ciclo for).

In [6]:
inizio = time.time()
contenuti = list()

for url in URLS:
    risposta = requests.get(url)
    contenuto = BeautifulSoup(risposta.content, "lxml").get_text()
    contenuti.append(contenuto)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

Durata: 2.91s


Ripetiamo la stessa operazione utilizzando la *list comprehension*.

In [None]:
inizio = time.time()

contenuti = [BeautifulSoup(requests.get(url).content, "lxml").get_text() for url in URLS]

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

# 3. Programmazione concorrente - *multithreading* <a id=multithreading> </a>

## 3.1 La classe [Thread](https://docs.python.org/3.6/library/threading.html#threading.Thread) <a id=thread> </a>

Otteniamo il contenuto delle url elencate al punto precedente utilizzando la programmazione in *multithreading*, tramite la classe `Thread`. Per farlo, avremo bisogno di inserire all'interno di una funzione i comandi che effettuano il *web scraping* della singola url.

In [None]:
def contenuto_url(url, coda=None, verboso=True):
    id_processo = os.getpid()
    nome_processo = mp.current_process().name
    nome_thread = threading.current_thread().name

    if verboso:
        print("[INIZIO]\nURL: {}\nID processo: {}\nNome processo: {}\nNome"
        "thread: {}\n".format(url, id_processo, nome_processo, nome_thread))
        
    # il blocco try/Except consente di "intercettare" eccezioni/errori
    # nell'esecuzione, in modo da indicare le cause dell'arresto.
    try:
        risposta = requests.get(url)
        testo = BeautifulSoup(risposta.content, "lxml").get_text()
    except requests.exceptions.HTTPError as err:
        print("HTTP Error:", err)
    except requests.exceptions.ConnectionError as err:
        print("Connection Error:", err)
    except requests.exceptions.Timeout as err:
        print("Timeout Error:", err)
    except requests.exceptions.RequestException as err:
        print("Request Error", err)

    if verboso:
        print("[FINE]\nURL: {}\nID processo: {}\nNome processo: {}\nNome"
        "thread: {}\n".format(url, id_processo, nome_processo, nome_thread))

    if coda is None:
        return testo
    else:
        coda.put(testo)

In [None]:
verboso = True

inizio = time.time()

# istanziamo una coda, alla quale tutti i thread hanno accesso 
# e possono depositare i risultati ottenuti
coda = mp.Queue()

# istanziamo N=5 threads
threads = [threading.Thread(target=contenuto_url, args=(url,),
    kwargs={"coda": coda, "verboso": verboso}) for url in URLS]

# avviamo ciascun thread
for t in threads:
    t.start()

# 
contenuti = [coda.get() for t in threads]

for t in threads:
    t.join() # blocca il MainThread finché t non è completato

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

## 3.2 Confrontare i tempi di esecuzione <a id=tempi> </a>

### Tempo di esecuzione approccio sequenziale

In [None]:
def contenuto_urls_sequenziale(urls, verboso=False):
    for url in urls:
        contenuti = [contenuto_url(url, verboso=verboso) for url in urls]
    return contenuti

In [None]:
%timeit -r 5 -n 1 contenuto_urls_sequenziale(URLS, verboso=False)

### Tempo di esecuzione approccio multithreading

In [None]:
def contenuto_urls_threading(urls, verboso=True):
    coda = mp.Queue()

    threads = [threading.Thread(target=contenuto_url, args=(url,),
        kwargs={"coda": coda, "verboso": verboso}) for url in urls]

    for t in threads:
        t.start()

    contenuti = [coda.get() for t in threads]

    for t in threads:
        t.join() # blocca il MainThread finché t non è completato

    return contenuti

In [None]:
%timeit -r 5 -n 1 contenuto_urls_threading(URLS, verboso=False)

# 4. Programmazione concorrente - *multiprocessing* <a id=multiprocessing> </a>

Sia la classe `multiprocessing.Process` che la `multiprocessing.Pool` utilizzano diversi processi per parallelizzare l'esecuzione.
 
* La `Pool` distribuisce i task da eseguire su una coda FIFO (First In First Out), mappa l'input ricevuto sui processori disponibili e colleziona gli output in forma di lista o array. Solo i processi attualmente in esecuzione sono contenuti in memoria.

* La `Process` istanzia tutti i processi in memoria.

Quando è meglio usare l'uno o l'altro?
* Nel caso in cui dovessimo eseguire molti task ripetitivi, è consigliabile utilizzare `Pool`, che si occuperà di associare ogni task ad un processore;
* Se abbiamo un numero ridotto di task, ciascuno dei quali viene eseguito solo una volta, ha senso usare un `Process` per ciascuno di essi.

In [None]:
N_CPU = mp.cpu_count()

print("# CPU: {}".format(N_CPU))

## 4.1 La classe [Process](https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#multiprocessing.Process) <a id=process> </a>

In [None]:
from msbd.scraping import contenuto_url, contenuto_urls_multiprocessing
print(inspect.getsource(contenuto_urls_multiprocessing))

In [None]:
inizio = time.time()

contenuti = contenuto_urls_multiprocessing(URLS)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

Utilizziamo il *comando magico* `%timeit` per calcolare il tempo di esecuzione nel caso di approccio *multiprocess* con la funzione `contenuto_urls_multiprocessing(URLS, verboso=False)`

In [None]:
%timeit -r 5 -n 1 contenuto_urls_multiprocessing(URLS, verboso=False)

## 4.2 La classe [Pool](https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#multiprocessing.pool.Pool) <a id=pool> </a>

In [None]:
inizio = time.time()

pool = mp.Pool(processes=N_CPU)
contenuti = [pool.map(contenuto_url, URLS)]
pool.close()
pool.join()

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

In [None]:
from nltk.tokenize import TweetTokenizer
from nltk.stem.snowball import SnowballStemmer

In [None]:
# leggere il data set
tweets = pd.read_csv("datasets/twitter/train.csv", encoding="latin")["SentimentText"].tolist()
# creare il tokenizer
tokenizer = TweetTokenizer(preserve_case=False, reduce_len=True, strip_handles=True)
# creare lo stemmer
stemmer = SnowballStemmer("english")
# creare una funzione per dividere il tweet in token ridotti alla radice
def tweet_analyzer(tweet): return [stemmer.stem(t) for t in tokenizer.tokenize(tweet)]

### Approccio sequenziale

In [None]:
inizio = time.time()

tweets_preproc = [tweet_analyzer(tweet) for tweet in tweets]

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

print(tweets_preproc[42])

### Approccio parallelo

Utilizziamo la classe `Pool` per parallelizzare l'analisi dei tweet.

In [None]:
inizio = time.time()

pool = mp.Pool(processes=N_CPU)
tweets_preproc = pool.map(tweet_analyzer, tweets)
pool.close()
pool.join()

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

print(tweets_preproc[42])

### Linee guida generali sulla scelta della classe più appropriata

>1. **Thread**: numero di task medio-basso, molte operazioni di I/O, utilizzo della CPU relativamente basso;
>2. **Process**: numero di task medio-basso, utilizzo intensivo della CPU;
>3. **Pool**: numero di task alto, utilizzo intensivo della CPU.

# 5. Scegliere gli iperparametri ottimali <a id=iperparametri> </a>

Confronteremo ora le performance dell'approccio sequenziale e quelle dell'approccio multiprocesso nella ricerca degli iperparametri ottimali, da ricercare all'interno di una griglia da noi specificata.

Questa è un'operazione facilmente parallelizzabile che sfrutta la potenza del calcolo parallelo.

In [None]:
from msbd.preprocessamento import OttenereDummy
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.model_selection import ParameterGrid
from sklearn.model_selection import ShuffleSplit
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier

In [None]:
# leggere il data set
dati = pd.read_csv("datasets/titanic/train.csv")

# dividere la X dalla y
X, y = dati.drop(columns="Survived").copy(), dati["Survived"].copy()

# definire una pipeline di classificazione
clf = Pipeline([
    ("ottenere_dummy", OttenereDummy(drop_first=True)),
    ("imputer", SimpleImputer(strategy="mean")), 
    ("tree", DecisionTreeClassifier())
])

# griglia su cui eseguire la ricerca
griglia = {
    'tree__max_depth': np.arange(1, 18),
    'tree__min_samples_leaf': 2 ** np.arange(9),
}

# dividere i dati in training e test
splitter = ShuffleSplit(n_splits=1, test_size=0.25, random_state=42)
train_indices, val_indices = next(splitter.split(X, y))
X_train, y_train = X.iloc[train_indices], y.iloc[train_indices]
X_val, y_val = X.iloc[val_indices], y.iloc[val_indices]

### Approccio sequenziale

Effettuiamo una *grid search* come visto nel notebook [Alberi di decisione](11_alberi_di_decisione.ipynb).

In [None]:
import tqdm
inizio = time.time()

risultati = []

for params in tqdm.tqdm(ParameterGrid(griglia)):
    clf.set_params(**params)
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_val)
    params["accuracy_score"] = accuracy_score(y_val, y_pred)
    risultati.append(params)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

risultati = pd.DataFrame(risultati)
risultati.sort_values("accuracy_score", ascending=False, inplace=True)
risultati.reset_index(drop=True, inplace=True)

risultati.head(5)

### Approccio parallelo

Molti dei metodi offerti da `sklearn` sfruttano il calcolo parallelo senza il bisogno di specificare esplicitamente *processi*, *thread* o *pool*, ma semplicemente valorizzando alcuni loro parametri.

Ad esempio, `sklearn.model_selection.GridSearchCV` permette di specificare tramite il parametro `n_jobs` il numero di processi da utilizzare per la ricerca degli iperparametri ottimali, utilizzando quanto già fatto finora, ad esempio all'interno di una *pipeline* o di un classificatore.

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
inizio = time.time()

gscv = GridSearchCV(
    estimator=clf, 
    param_grid=griglia,
    scoring="accuracy",
    cv=splitter, 
    n_jobs=N_CPU, 
    return_train_score=False
)

gscv.fit(X, y)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

risultati = pd.DataFrame(gscv.cv_results_)
risultati.sort_values("split0_test_score", ascending=False, inplace=True)
risultati.reset_index(drop=True, inplace=True)

risultati.head(5)