
# 03 — Concurrence & Parallélisme en Python (Version **Université**)

> **Objectif pédagogique** : à la fin de ce chapitre, tu dois être capable d’**expliquer** (oral d’entretien) et **implémenter** (live coding) un pipeline concurrent **correct** en Python, choisir entre **threads**, **processus**, et **asyncio**, et justifier tes choix (I/O‑bound vs CPU‑bound, GIL, coûts de pickling, backpressure, timeouts).

### Plan du cours
1. **Motivation & vocabulaire** (concurrence vs parallélisme, latence vs débit)  
2. **Le GIL** (définition, conséquences, mythes)  
3. **Threading** (quand, pourquoi, design patterns : producer/consumer, backpressure, poison pill)  
4. **Multiprocessing** (vrai parallélisme CPU, pickling, chunking, shared memory)  
5. **Asyncio** (concurrence coopérative, event loop, cancellation, timeouts)  
6. **Étude de cas “feed → stratégie → OMS”** (combiner les modèles proprement)  
7. **Pièges, bonnes pratiques, checklist d’entretien**  
8. **Exercices guidés** + corrigés



## 1) Motivation & vocabulaire (5 min)

**Pourquoi la concurrence ?**  
Dans un moteur de trading, tu dois souvent **consommer** des données (ticks, carnets d’ordres), **calculer** des signaux, et **émettre** des ordres **en parallèle** pour minimiser la **latence** et maximiser le **débit**.

**Concurrence vs Parallélisme**  
- **Concurrence** : organiser des tâches qui se **chevauchent** dans le temps (pas nécessairement exécutées en même temps).  
- **Parallélisme** : exécuter **vraiment en même temps** sur plusieurs cœurs.  
Python (CPython) offre les deux, via des **threads** (concurrence utile en I/O), des **processus** (parallélisme CPU) et **asyncio** (concurrence coopérative mono‑thread).

**Latence vs Débit**  
- **Latence** = temps de réponse pour une tâche unique.  
- **Débit (throughput)** = nombre de tâches traitées par unité de temps.  
Un bon design doit préciser ce qu’on optimise.



## 2) Le GIL (Global Interpreter Lock) — ce qu’il faut dire à l’oral

- Le **GIL** est un verrou global de l’interpréteur **CPython** : à un instant donné, **un seul thread** exécute du **bytecode Python**.  
- **Implication** : pour du **CPU pur** (boucles Python lourdes), les threads **n’accélèrent pas** (ils se partagent le GIL).  
- **Mais** : les opérations **I/O** (réseau, disque) **libèrent** le GIL → les threads peuvent **vraiment** se chevaucher en I/O (bon pour HTTP, sockets, DB).  
- Beaucoup de libs natives (NumPy) exécutent du C qui **libère** le GIL pendant l’opération : un thread peut alors découpler Python↔C.

**Résumé décidable**  
- **I/O‑bound** → `threading` **ou** `asyncio`  
- **CPU‑bound** → `multiprocessing` (ou Numba/Cython)

**Mythes fréquents**  
- “Le GIL empêche toute concurrence” ❌ Faux : il empêche seulement l’exécution **CPU Python** simultanée, pas l’**overlap d’I/O**.



## 3) Threading — Producer/Consumer, backpressure, poison pill 

### Quand utiliser les **threads** ?
- Tâches **I/O‑bound** : lectures socket, appels réseau, écriture disque, DB.  
- Tu veux un modèle pragmatique, simple à lire, sans dépendre de l’event loop `asyncio`.

### Pourquoi une **queue** ?
- **Découpler** producteur(s) et consommateur(s).  
- Implémenter une **régulation (backpressure)** via `maxsize` : si les consommateurs ralentissent, le producteur **bloque** sur `put()`, évitant de saturer la mémoire.  
- Faciliter l’**arrêt propre** via **poison pill** (sentinelle `None`).

### Schéma mental
```
[Producer(s)] --put()--> [ queue.Queue(maxsize=M) ] --get()--> [Consumer(s)]
   ↑ backpressure si file pleine                  ↑ poison pill pour arrêter
```


In [1]:

# Threading : pipeline robuste avec backpressure & poison pill
import threading, time, queue, random

q = queue.Queue(maxsize=200)    # régulation
results = []
N_ITEMS = 2000

def producer(n=N_ITEMS):
    for i in range(n):
        q.put((i, random.random()))  # bloque si maxsize atteint
    for _ in range(4):               # 4 consommateurs => 4 pilules
        q.put(None)                  # poison pill

def consumer(cid):
    while True:
        item = q.get()               # bloque si vide
        if item is None:
            q.task_done()
            break
        i, x = item
        # Travail I/O simulé
        time.sleep(0.0005)
        results.append((cid, i, x*2))
        q.task_done()

threads = [threading.Thread(target=consumer, args=(k,)) for k in range(4)]
for t in threads: t.start()
producer()
q.join()                             # attend le traitement complet
for t in threads: t.join()

len(results), results[:5]


(2000,
 [(0, 0, 0.7405738344840216),
  (1, 3, 0.2692098905282778),
  (3, 2, 0.32296949252872076),
  (2, 1, 1.1266421518297678),
  (2, 7, 1.8436315658140712)])


### Sections critiques & **race conditions**
Dès qu’on **partage** un état (ex : un compteur), deux threads peuvent écrire **en même temps** → **race condition**.

**Remède** : un **verrou** (`threading.Lock`) entourant la **plus petite** portion de code qui manipule l’état (section critique).


In [2]:

# Démonstration lock minimal
import threading

counter = 0
lock = threading.Lock()

def incr(n=10000):
    global counter
    for _ in range(n):
        # Protéger seulement ce qui est nécessaire
        with lock:
            counter += 1

threads = [threading.Thread(target=incr) for _ in range(8)]
[t.start() for t in threads]
[t.join() for t in threads]
counter  # doit être 80000 si tout va bien


80000


**Bonnes pratiques Threading (à citer)**
- Préférer la **communication par messages** (queues) aux verrous sophistiqués.  
- Les traitements doivent être **idempotents** (rejouables sans casser l’état).  
- Gérer l’**arrêt** (poison pill), les **timeouts** et la **journalisation** (logs).

---



## 4) Multiprocessing — vrai parallélisme CPU (12 min)

### Quand utiliser **multiprocessing** ?
- Tâches **CPU‑bound** (calcul numérique pur en Python) qui ne bénéficient pas d’une lib C optimisée.  
- Exploiter **plusieurs cœurs** sans GIL partagé.

### Coûts/Contraintes
- **Pickling** : les données échangées entre process sont **sérialisées** → coût non négligeable.  
- **Spawn/Fork** : lancement de process coûteux (surtout `spawn` sur Windows/macOS).  
- Favoriser des **gros “chunks”** de travail (coarse‑grained) pour amortir les coûts.

### Exemple


In [None]:

from multiprocessing import Pool
import math, time

def cpu_heavy(n):
    s=0.0
    for i in range(10_000):
        s += math.sqrt((i*n) % 97)
    return s

t0 = time.perf_counter()
with Pool(4) as p:
    res = p.map(cpu_heavy, [10,11,12,13])
elapsed = time.perf_counter()-t0
len(res), f"{elapsed:.3f}s"



**Tips “entretien”**
- Les objets envoyés à un worker doivent être **picklables**.  
- **Chunking** : grouper des tâches plutôt que d’envoyer 1 micro‑tâche = 1 message.  
- En data science, voir aussi **shared_memory** (tableaux partagés) pour limiter les copies.

---



## 5) Asyncio — Concurrence coopérative mono‑thread (15 min)

### Pourquoi **asyncio** ?
- Des milliers de **petites I/O** concurrentes (HTTP, websockets) avec un **faible overhead**.  
- Contrôle fin : **timeouts**, **cancellation**, **gather**/`TaskGroup` (3.11+).

### Schéma mental
```
          +-------------------+
          |   Event Loop      |
await --> |   planifie/ordonne| --> d'autres coroutines progressent
          +-------------------+
```

**Idée clé** : à chaque `await`, la coroutine **rend la main**. Si elle attend I/O, **une autre** peut avancer.


In [None]:

# Timeout global avec asyncio.wait_for + gather
import asyncio, random

async def fetch(symbol):
    await asyncio.sleep(random.random()/5)  # I/O simulé
    return symbol, 100 + random.random()

async def main():
    syms = [f"SYM{i}" for i in range(10)]
    try:
        res = await asyncio.wait_for(
            asyncio.gather(*[fetch(s) for s in syms]),
            timeout=2.0
        )
        print("OK", len(res))
    except asyncio.TimeoutError:
        print("Timeout global")

asyncio.run(main())



### Queue `asyncio` : producer/consumers
- Très proche du modèle `queue.Queue`, mais **non bloquant**.  
- Parfait pour un **collecteur de ticks** qui pousse dans un bus interne.


In [None]:

import asyncio, random

async def producer(ch, n=50):
    for i in range(n):
        await ch.put((i, random.random()))
    for _ in range(3):
        await ch.put(None)

async def consumer(ch, cid):
    while True:
        item = await ch.get()
        if item is None:
            ch.task_done()
            break
        await asyncio.sleep(0.002)  # I/O simulé
        ch.task_done()

async def run():
    ch = asyncio.Queue(maxsize=20)
    prod = asyncio.create_task(producer(ch, 100))
    cons = [asyncio.create_task(consumer(ch, k)) for k in range(3)]
    await asyncio.gather(prod)
    await ch.join()
    for c in cons:
        await c

asyncio.run(run())



**À citer en entretien**
- `await` = point de suspension coopératif ; pas de préemption.  
- **Timeouts** via `asyncio.wait_for`, **cancellation** via `task.cancel()`.  
- **Pas** de data race Python (mono‑thread), mais attention au **partage mutable** entre coroutines.

---



## 6) Étude de cas : **Feed → Stratégie → OMS** (10 min)

**Contexte** : tu reçois des ticks (I/O), tu calcules un signal (léger CPU), tu routes un ordre (I/O).  
**Design recommandé** (simple & robuste) :
```
[Ticker Thread/Async] --(queue)--> [Strategy Worker(s)] --(queue)--> [OMS Adapter (I/O)]
```
- **Entrée** (I/O) : `threading` **ou** `asyncio`  
- **Traitement** (léger CPU) : **threads** conviennent (ou même synchrone si simple)  
- **Sortie** (I/O vers broker) : **Adapter** + `threading`/`asyncio`

**Pourquoi pas `multiprocessing` ici ?**  
- Le calcul est léger. Coûts de **pickling** > gains.  
- Réserve `multiprocessing` aux tâches **CPU lourdes** (pricing massif offline, backtests).



## 7) Pièges, bonnes pratiques, checklist d’entretien (8 min)

**Pièges classiques**
- Oublier la **backpressure** (file non bornée → OOM).  
- Pas de **poison pill** → threads zombies à l’arrêt.  
- Verrous trop larges → **contention** + perf dégradée.  
- Pas de **timeout** sur I/O → blocages invisibles.  
- Mélanger trop de modèles (threads + async + process) sans raison.

**Bonnes pratiques**
- **Simplicité d’abord** : un seul modèle si possible.  
- **Messages/queues** > partage d’état + verrous.  
- **Idempotence** des handlers, **logs** clairs, **metrics** (latence, taille de file, débit).  
- Tests : **déterministes** (stubs pour I/O), pas de `sleep` arbitraires (utiliser des queues/événements).

**Checklist entretien (à recaser)**
- “GIL ⇒ threads pour I/O, `multiprocessing` pour CPU”  
- “Pipeline queue avec **maxsize** (backpressure) + **poison pill**”  
- “**Timeouts** & **cancellation** en `asyncio`”  
- “**Pickling** entre process, penser **chunking** / **shared_memory**”



## 8) Exercices guidés (avec corrigés)

### Exercice A — Pipeline threads avec mesure de débit
**Tâche** : étends l’exemple “producer/consumer” pour afficher le **débit (items/s)** et la **latence moyenne** par item.  
**Indice** : timestamp à la production + calcul à la consommation.

> 👉 Corrigé ci‑dessous.


In [None]:

# Corrigé Exercice A (simplifié)
import time, threading, queue, statistics, random

q = queue.Queue(maxsize=500)
times = []
N = 3000

def prod():
    for i in range(N):
        q.put((i, time.perf_counter()))
    for _ in range(4): q.put(None)

def cons():
    while True:
        item = q.get()
        if item is None:
            q.task_done(); break
        i, t0 = item
        # Simule un petit travail
        time.sleep(0.0003)
        times.append(time.perf_counter()-t0)
        q.task_done()

t0 = time.perf_counter()
ts = [threading.Thread(target=cons) for _ in range(4)]
for t in ts: t.start()
prod()
q.join()
for t in ts: t.join()
elapsed = time.perf_counter()-t0
throughput = N/elapsed
lat = statistics.mean(times)
throughput, lat



### Exercice B — `multiprocessing` avec chunking
**Tâche** : transforme une liste de 400 “jobs” en 8 chunks de 50 et distribue-les via `Pool.map`.  
**But** : illustrer l’amortissement des coûts de messaging/pickling.

*(À faire en autonomie — vérifie le temps total avec et sans chunking.)*

### Exercice C — `asyncio` + timeout par requête
**Tâche** : lance 50 `fetch()` en parallèle avec un **timeout par requête** (pas juste global).  
**Indice** : enveloppe **chaque** coroutine avec `wait_for` et gère `TimeoutError` **individuellement**.

---

## Glossaire express
- **Backpressure** : mécanisme qui ralentit la production quand la consommation ne suit pas.  
- **Poison pill** : sentinelle pour arrêter proprement un worker.  
- **Idempotence** : répéter une action produit le même état final.  
- **Pickling** : sérialisation Python pour IPC entre process.  
- **Cancellation** : annuler une coroutine/Task en `asyncio`.
