# TP 7 : Dask Dataframe et Machine learning distribué

## Exercice 1 Persistance des données

Le répertoire `data/` contient une série de fichiers Parquet. Chaque fichier contient des informations sur des maisons avec les colonnes suivantes : `surface`, `chambres`, `distance_centre`, `anciennete`, et `prix`.

Le code suivant charge un fichier Parquet et retourne la moyenne, l'écart type et le maximum du prix.

In [156]:
import pandas as pd
import dask.dataframe as dd
import time
from pathlib import Path
from dask.distributed import Client,LocalCluster
import seaborn as sns
import matplotlib.pyplot as plt

In [157]:
cluster=LocalCluster()
client=Client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34083 instead


In [158]:
# chargement du fichier parquet
file_path = 'data'

In [159]:
p=Path(file_path)
fich=[i for i in p.glob('*',case_sensitive=False)]

In [160]:
%%timeit
# Charger le fichier Parquet avec Pandas

for file_path in fich:
    print(f'-------fichier {file_path}--------------')
    %time df = pd.read_parquet(file_path)

    # Calculer la moyenne, l'écart type et le maximum du prix
    %time mean_price = df['prix'].mean()
    %time std_price = df['prix'].std()
    %time max_price = df['prix'].max()

   

-------fichier data/houses_10000.parquet--------------
CPU times: user 4.16 ms, sys: 1.04 ms, total: 5.2 ms
Wall time: 2.76 ms
CPU times: user 263 μs, sys: 0 ns, total: 263 μs
Wall time: 239 μs
CPU times: user 231 μs, sys: 0 ns, total: 231 μs
Wall time: 192 μs
CPU times: user 77 μs, sys: 0 ns, total: 77 μs
Wall time: 71 μs
-------fichier data/houses_20000.parquet--------------
CPU times: user 2.43 ms, sys: 1.89 ms, total: 4.32 ms
Wall time: 2.22 ms
CPU times: user 239 μs, sys: 73 μs, total: 312 μs
Wall time: 292 μs
CPU times: user 451 μs, sys: 0 ns, total: 451 μs
Wall time: 329 μs
CPU times: user 132 μs, sys: 0 ns, total: 132 μs
Wall time: 105 μs
-------fichier data/houses_100000.parquet--------------
CPU times: user 6.48 ms, sys: 1.75 ms, total: 8.23 ms
Wall time: 3.1 ms
CPU times: user 370 μs, sys: 0 ns, total: 370 μs
Wall time: 376 μs
CPU times: user 670 μs, sys: 0 ns, total: 670 μs
Wall time: 651 μs
CPU times: user 518 μs, sys: 0 ns, total: 518 μs
Wall time: 464 μs
-------fichier d

In [161]:
 # Afficher les résultats
print(f"Moyenne du prix : {mean_price}")
print(f"Écart type du prix : {std_price}")
print(f"Prix maximum : {max_price}")

Moyenne du prix : 432926.85517698183
Écart type du prix : 234926.1272435667
Prix maximum : 895962.8566820562


### Question 1 : Parallélisation avec Dask

1. Réaliser un Traitement Équivalent avec Dask DataFrame

In [189]:
%%time
for file_path in fich: 
    debut=time.time()
    %time df = dd.read_parquet(file_path)
    print(f'-------fichier {file_path}--------------')
    %time mean_price = df['prix'].mean().compute()
    %time std_price = df['prix'].std().compute()
    %time max_price = df['prix'].max().compute()

CPU times: user 3.97 ms, sys: 654 μs, total: 4.63 ms
Wall time: 4.09 ms
-------fichier data/houses_10000.parquet--------------
CPU times: user 11.7 ms, sys: 78 μs, total: 11.8 ms
Wall time: 11 ms
CPU times: user 14.4 ms, sys: 640 μs, total: 15 ms
Wall time: 14.3 ms
CPU times: user 7.9 ms, sys: 186 μs, total: 8.09 ms
Wall time: 7.19 ms
CPU times: user 2.11 ms, sys: 992 μs, total: 3.1 ms
Wall time: 2.82 ms
-------fichier data/houses_20000.parquet--------------
CPU times: user 10.7 ms, sys: 1.23 ms, total: 11.9 ms
Wall time: 10.7 ms
CPU times: user 12.1 ms, sys: 1.75 ms, total: 13.8 ms
Wall time: 12.8 ms
CPU times: user 8.45 ms, sys: 357 μs, total: 8.8 ms
Wall time: 7.46 ms
CPU times: user 5.1 ms, sys: 0 ns, total: 5.1 ms
Wall time: 4.73 ms
-------fichier data/houses_100000.parquet--------------
CPU times: user 13.6 ms, sys: 0 ns, total: 13.6 ms
Wall time: 12.8 ms
CPU times: user 19.6 ms, sys: 0 ns, total: 19.6 ms
Wall time: 17.2 ms
CPU times: user 11.2 ms, sys: 0 ns, total: 11.2 ms
Wall 

In [190]:
 # Afficher les résultats
print(f"Moyenne du prix : {mean_price}")
print(f"Écart type du prix : {std_price}")
print(f"Prix maximum : {max_price}")

Moyenne du prix : 432926.85517698183
Écart type du prix : 234926.1272435667
Prix maximum : 895962.8566820562


2. Comparer les temps de restitution des versions pandas et Dask pour des volumétries de données croissantes.

## Question 2 : Optimisation avec `persist()`

1. Utiliser la méthode `persist()` pour éviter les chargements de données redondants

In [191]:
for file_path in fich: 
    df = dd.read_parquet(file_path).persist()
    mean_price = df['prix'].mean().persist()
    std_price = df['prix'].std().persist()
    max_price = df['prix'].max().persist()

In [198]:
# Afficher les résultats
print(f"Moyenne du prix : {mean_price}")
print(f"Écart type du prix : {std_price}")
print(f"Prix maximum : {max_price}")

Moyenne du prix : <dask_expr.expr.Scalar: expr=FromGraph(e9d3f1d), dtype=float64>
Écart type du prix : <dask_expr.expr.Scalar: expr=FromGraph(d985838), dtype=float64>
Prix maximum : <dask_expr.expr.Scalar: expr=FromGraph(e95ba78), dtype=float64>


2. Comparer les temps de restitution entre la version Dask optimisée avec `persist()` et les versions précédentes.

In [199]:
%%time
for file_path in fich: 
    %time df = dd.read_parquet(file_path)
    print(f'-------fichier {file_path}--------------')
    %time mean_price = df['prix'].mean().compute()
    %time std_price = df['prix'].std().compute()
    %time max_price = df['prix'].max().compute()

CPU times: user 5.09 ms, sys: 755 μs, total: 5.84 ms
Wall time: 4.68 ms
-------fichier data/houses_10000.parquet--------------
CPU times: user 11.5 ms, sys: 587 μs, total: 12.1 ms
Wall time: 11.4 ms
CPU times: user 13.2 ms, sys: 0 ns, total: 13.2 ms
Wall time: 12.9 ms
CPU times: user 6.32 ms, sys: 1.65 ms, total: 7.96 ms
Wall time: 7.45 ms
CPU times: user 3.62 ms, sys: 0 ns, total: 3.62 ms
Wall time: 3.56 ms
-------fichier data/houses_20000.parquet--------------
CPU times: user 9.19 ms, sys: 3.41 ms, total: 12.6 ms
Wall time: 10.9 ms
CPU times: user 14.5 ms, sys: 0 ns, total: 14.5 ms
Wall time: 13.4 ms
CPU times: user 6.68 ms, sys: 0 ns, total: 6.68 ms
Wall time: 6.46 ms
CPU times: user 3.96 ms, sys: 188 μs, total: 4.15 ms
Wall time: 3.39 ms
-------fichier data/houses_100000.parquet--------------
CPU times: user 12.8 ms, sys: 1.23 ms, total: 14 ms
Wall time: 12.1 ms
CPU times: user 13.9 ms, sys: 0 ns, total: 13.9 ms
Wall time: 13.4 ms
CPU times: user 8.42 ms, sys: 0 ns, total: 8.42 ms


In [200]:
%%time
for file_path in fich: 
    %time df = dd.read_parquet(file_path)
    print(f'-------fichier {file_path}--------------')
    %time mean_price = df['prix'].mean().persist()
    %time std_price = df['prix'].std().persist()
    %time max_price = df['prix'].max().persist()

CPU times: user 4.07 ms, sys: 0 ns, total: 4.07 ms
Wall time: 3.3 ms
-------fichier data/houses_10000.parquet--------------
CPU times: user 8.39 ms, sys: 562 μs, total: 8.95 ms
Wall time: 8.35 ms
CPU times: user 11.2 ms, sys: 0 ns, total: 11.2 ms
Wall time: 11 ms
CPU times: user 4.31 ms, sys: 1.47 ms, total: 5.78 ms
Wall time: 5.3 ms
CPU times: user 2.29 ms, sys: 65 μs, total: 2.36 ms
Wall time: 2.19 ms
-------fichier data/houses_20000.parquet--------------
CPU times: user 7.27 ms, sys: 1.03 ms, total: 8.3 ms
Wall time: 8 ms
CPU times: user 9.19 ms, sys: 769 μs, total: 9.96 ms
Wall time: 9.43 ms
CPU times: user 5 ms, sys: 0 ns, total: 5 ms
Wall time: 4.84 ms
CPU times: user 2.56 ms, sys: 0 ns, total: 2.56 ms
Wall time: 2.48 ms
-------fichier data/houses_100000.parquet--------------
CPU times: user 6.16 ms, sys: 978 μs, total: 7.14 ms
Wall time: 6.88 ms
CPU times: user 11.1 ms, sys: 778 μs, total: 11.9 ms
Wall time: 11.3 ms
CPU times: user 5.06 ms, sys: 844 μs, total: 5.91 ms
Wall time: