# Multiprocessing

Combien ai-je de processeurs ?

In [78]:
from multiprocessing import cpu_count

cpu_count()

8

## Exemple d'une map

In [79]:
def delayed_square(x):
    sleep(1)
    return x * x


data = list(range(8))
data

[0, 1, 2, 3, 4, 5, 6, 7]

In [80]:
%time sum(delayed_square(x) for x in data)

CPU times: user 686 μs, sys: 3.28 ms, total: 3.97 ms
Wall time: 8 s


140

In [81]:
%time sum(map(delayed_square,data))

CPU times: user 1.1 ms, sys: 2.02 ms, total: 3.12 ms
Wall time: 8 s


140

Nous pouvons traiter chaque appel à `delayed_square` de manière indépendante et en parallèle. Pour ce faire, nous appliquerons cette fonction à tous les éléments de la liste en parallèle en utilisant plusieurs processus.

## Multiprocessing

- Le multiprocessing permet au programmeur de tirer pleinement parti de plusieurs processeurs.
- L'objet `Pool` parallélise l'exécution d'une fonction sur plusieurs valeurs d'entrée.
- La partie `if __name__ == '__main__'` est nécessaire.
⚠️ Le programme suivant ne fonctionne pas dans une cellule, vous devez l'enregistrer et l'exécuter avec python dans un terminal.

```bash
python3 pool.py
```

In [82]:
%%file pool.py

from time import time, sleep

from multiprocessing import Pool


def delayed_square(x):
    sleep(1)
    return x * x


if __name__ == '__main__':  # Executed only on main process.
    start = time()
    data = list(range(8))
    with Pool() as process_pool:
        result = sum(process_pool.map(delayed_square, data))
    stop = time()
    print(f"result = {result} - Elapsed time {stop - start}")

Overwriting pool.py


In [83]:
!{sys.executable} pool.py

result = 140 - Elapsed time 1.042806625366211


On voit que l'on passe de 8 secondes à 1 seconde pour le calcul.
L'exécution est donc bien concurrente.

### Exercice 1

Développer le script `cube.py` qui calcule la somme des cubes des 100 premiers entiers en utilisant un pool de processus.
Le script ne doit utiliser que 4 CPU en parallèle maximum.

- Attention aux commentaires
- Attention aux noms de variables.  
*Faites valider votre script ainsi que son exécution*. 

On voit que `multiprocessing.Pool.map` est utile car il permet de retourner les résultats dans l'ordre des arguments.
Ci-dessous les process s'exécutent en parallèle et le résultat n'est pas retourné.

In [84]:
from multiprocessing import Process
from time import time, sleep


class DelayedSquareProcess(Process):
    def __init__(self, x):
        super().__init__()
        self.x = x
        self.res = 0

    def run(self):
        sleep(1)
        self.res = self.x * self.x


if __name__ == '__main__':
    start = time()
    processes = [DelayedSquareProcess(x) for x in range(8)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
    result = sum(process.res for process in processes)
    stop = time()
    print(f"result = {result} - Elapsed time {stop - start}")

result = 0 - Elapsed time 1.0556557178497314


Cette syntaxe n'est donc utile que pour les effets secondaires, appels réseaux, écriture de fichiers, etc.

Comment donc faire utiliser les résultats d'un process et faire communiquer ces derniers entre eux ?

## Queue

- `multiprocessing.Queue` est une structure de données partagée entre les processus.
- Il est utilisé pour la communication entre les processus.
- Il est similaire à la classe `queue.Queue` de la bibliothèque standard et son API est similaire.
- Il permet d'avoir le retour d'un process

Par exemple ci-dessous 2 process cherchent en parallèle des informations sur des cryptomonnaies.
Ils le font en parallèle pour réduire le temps de réponse.
Ils stockent les résultats dans une queue.

In [85]:
import requests
from multiprocessing import Process, Queue


def fetch_coincap(queue: Queue):
    response = requests.get("https://api.coincap.io/v2/assets")
    queue.put(response.json())
    # Pas de return, le résultat dans la queue


def fetch_coindesk(queue: Queue):
    response = requests.get("https://api.coindesk.com/v1/bpi/currentprice.json")
    queue.put(response.json())
    # Pas de return, le résultat dans la queue


if __name__ == '__main__':
    queue = Queue()
    coincap_process = Process(target=fetch_coincap, args=(queue,))
    coindesk_process = Process(target=fetch_coindesk, args=(queue,))
    coincap_process.start()
    coindesk_process.start()
    # Join after both starts !!!!!
    coincap_process.join()
    coindesk_process.join()
    print(queue.get())
    print(queue.get())

{'time': {'updated': 'Oct 12, 2024 09:22:17 UTC', 'updatedISO': '2024-10-12T09:22:17+00:00', 'updateduk': 'Oct 12, 2024 at 10:22 BST'}, 'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org', 'chartName': 'Bitcoin', 'bpi': {'USD': {'code': 'USD', 'symbol': '&#36;', 'rate': '62,756.654', 'description': 'United States Dollar', 'rate_float': 62756.6537}, 'GBP': {'code': 'GBP', 'symbol': '&pound;', 'rate': '48,019.446', 'description': 'British Pound Sterling', 'rate_float': 48019.4459}, 'EUR': {'code': 'EUR', 'symbol': '&euro;', 'rate': '57,348.097', 'description': 'Euro', 'rate_float': 57348.097}}}
{'data': [{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '19766531.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1239658689885.2565149018558218', 'volumeUsd24Hr': '13614759305.6892261848141137', 'priceUsd': '62715.0353

## Les attributs d'un objet process

Il a entre autre un attribut `pid` qui est l'identifiant du process.

In [86]:
from multiprocessing import Process

p = Process(target=lambda: print("Hello"))
p.start()
for att in dir(p):
    if not att.startswith("_"):
        print(att, "has value: ", getattr(p, att), " and callable: ", callable(getattr(p, att)))

p.join()

Hello
authkey has value:  b'\x9e\\\xd1we\xda\xbc\x13\x94\x81\x03?\x95],\x89\x8a\xb9/\xdc\x1e\x1f\xaaO\xbdI\x19e;\xaau\x15'  and callable:  False
close has value:  <bound method BaseProcess.close of <Process name='Process-198' pid=821903 parent=86504 started>>  and callable:  True
daemon has value:  False  and callable:  False
exitcode has value:  None  and callable:  False
ident has value:  821903  and callable:  False
is_alive has value:  <bound method BaseProcess.is_alive of <Process name='Process-198' pid=821903 parent=86504 started>>  and callable:  True
join has value:  <bound method BaseProcess.join of <Process name='Process-198' pid=821903 parent=86504 started>>  and callable:  True
kill has value:  <bound method BaseProcess.kill of <Process name='Process-198' pid=821903 parent=86504 started>>  and callable:  True
name has value:  Process-198  and callable:  False
pid has value:  821903  and callable:  False
run has value:  <bound method BaseProcess.run of <Process name='Process

### Exercice 2

Développer le script `cube_queue.py` qui calcule les cubes des 100 premiers entiers les stock dans une queue et calcul la somme à partir de la queue.

- Attention aux commentaires
- Attention aux noms de variables.  
*Faites valider votre script ainsi que son exécution*. 