# Parallelismo in Python

Esploreremo i principali strumenti per il calcolo parallelo in Python:

## Perché utilizzare il parallelismo?

- Sfruttare tutte le risorse hardware disponibili
- Velocizzare operazioni CPU-bound o I/O-bound
- Gestire più attività simultaneamente

Esistono due approcci principali:
- **Multithreading**: condivisione memoria, ideale per operazioni I/O-bound
- **Multiprocessing**: processi separati, ideale per operazioni CPU-bound

## Il problema del GIL (Global Interpreter Lock)

- Il GIL impedisce l'esecuzione parallela di thread Python sulla stessa CPU
- Un solo thread può eseguire codice Python alla volta
- Limita il parallelismo reale nel multithreading
- Il multiprocessing bypassa questo limite creando interpreti Python separati

## Funzione di test

Per confrontare i vari metodi, useremo una funzione che calcola i numeri primi fino a un certo limite

In [1]:
def is_prime(n):
    """Verifica se un numero è primo. Generato da Claude 3.7 Sonnet"""
    if n <= 1:
        return False
    elif n <= 3:
        return True
    elif n % 2 == 0 or n % 3 == 0:
        return False
    i = 5
    while i * i <= n:
        if n % i == 0 or n % (i + 2) == 0:
            return False
        i += 6
    return True

In [2]:
def count_primes(start, end):
    """Conta i numeri primi in un intervallo"""
    count = 0
    for n in range(start, end):
        if is_prime(n):
            count += 1
    return count

def sequential_primes(ranges):
    """Esecuzione sequenziale"""
    results = []
    for start, end in ranges:
        results.append(count_primes(start, end))
    return results

## Approccio Sequenziale (baseline)

In [3]:
# Definiamo gli intervalli da calcolare
ranges = [(i*1000000, (i+1)*1000000) for i in range(3)]

In [4]:
%%time
sequential_primes(ranges)

CPU times: total: 16.8 s
Wall time: 17.9 s


[78498, 70435, 67883]

# 1. Multithreading

## Multithreading con `threading`

- Libreria standard di Python
- Crea thread condividendo lo stesso spazio di memoria
- Soggetto alle limitazioni del GIL
- Ideale per task I/O-bound (networking, file I/O)
- Non ottimale per task CPU-bound (calcoli)

In [5]:
import threading

def threading_primes(ranges):
    results = [0] * len(ranges)
    
    def worker(idx, start, end):
        results[idx] = count_primes(start, end)
    
    threads = []
    for i, (start, end) in enumerate(ranges):
        thread = threading.Thread(target=worker, args=(i, start, end))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    return results

In [6]:
%%time
threading_primes(ranges)

CPU times: total: 16.1 s
Wall time: 16.9 s


[78498, 70435, 67883]

## Concurrent.futures con ThreadPoolExecutor

- API più moderna e semplice
- Gestisce automaticamente il pool di thread
- Offre pattern come map/submit per esecuzioni parallele
- Sempre con le limitazioni del GIL

In [7]:
from concurrent.futures import ThreadPoolExecutor

def concurrent_threading_primes(ranges):
    with ThreadPoolExecutor() as executor:
        # Uso map per applicare la funzione a tutti gli elementi
        results = list(executor.map(lambda r: count_primes(r[0], r[1]), ranges))
    return results

In [8]:
%%time
concurrent_threading_primes(ranges)

CPU times: total: 17.5 s
Wall time: 18.4 s


[78498, 70435, 67883]

# 2. Multiprocessing

## Multiprocessing con `multiprocessing`

- Crea processi Python separati
- Ogni processo ha il suo interprete Python e memoria
- Bypassa il GIL
- Ideale per task CPU-bound
- Overhead maggiore per la creazione dei processi

In [9]:
import multiprocessing as mp

def multiprocessing_primes(ranges):
    with mp.Pool() as pool:
        # Uso starmap per passare gli argomenti espansi
        results = pool.starmap(count_primes, ranges)
    return results

Multiprocessing non è supportato negli Jupyter Notebook, quindi eseguiremo un file esterno con lo stesso codice tramite subprocess

In [10]:
%%time
import subprocess

process = subprocess.run(
    ["python", "test_multiprocessing.py", "--method", "multiprocessing"],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
process.stdout.decode().strip()

CPU times: total: 31.2 ms
Wall time: 2.35 s


'Risultati (multiprocessing): [78498, 70435, 67883]'

## Concurrent.futures con ProcessPoolExecutor

- Stessa API di ThreadPoolExecutor ma usa processi
- Interfaccia più moderna e semplice
- Sotto il cofano usa multiprocessing

In [11]:
from concurrent.futures import ProcessPoolExecutor

def concurrent_processing_primes(ranges):
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(lambda r: count_primes(r[0], r[1]), ranges))
    return results

Anche ProcessPoolExecutor non è supportato negli Jupyter Notebook, quindi eseguiremo un file esterno con lo stesso codice tramite subprocess

In [12]:
%%time
import subprocess

process = subprocess.run(
    ["python", "test_multiprocessing.py", "--method", "concurrent"],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
process.stdout.decode().strip()

CPU times: total: 31.2 ms
Wall time: 2.31 s


'Risultati (concurrent): [78498, 70435, 67883]'

# 3. Joblib

- Libreria popolare nell'ecosistema scientifico (scikit-learn)
- Supporta sia multiprocessing che multithreading
- Offre caching e pipeline di calcolo
- Gestisce automaticamente la parallelizzazione
- Backend intercambiabili (loky, multiprocessing, threading, dask)

In [13]:
from joblib import Parallel, delayed

def joblib_primes(ranges):
    # n_jobs=-1 utilizza tutti i core disponibili
    results = Parallel(n_jobs=-1)(delayed(count_primes)(start, end) for start, end in ranges)
    return results

In [14]:
%%time
joblib_primes(ranges)

CPU times: total: 46.9 ms
Wall time: 2.59 s


[78498, 70435, 67883]

# 4. Ray

- Framework moderno per il calcolo parallelo e distribuito
- Può scalare da un singolo computer a un cluster
- Particolarmente popolare nel ML/AI
- Offre funzionalità avanzate come actor model e scheduling
- Overhead iniziale ma buone prestazioni su task complessi

In [None]:
import ray
ray.init()

@ray.remote
def ray_count_primes(start, end):
    return count_primes(start, end)

def ray_primes(ranges):
    # Lancio tutti i task in parallelo
    futures = [ray_count_primes.remote(start, end) for start, end in ranges]
    # Raccolgo i risultati
    return ray.get(futures)

  from .autonotebook import tqdm as notebook_tqdm
2025-05-20 10:07:47,206	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [18]:
%%time
ray_primes(ranges)

CPU times: total: 156 ms
Wall time: 2.4 s


[78498, 70435, 67883]