# <center>**Dask pour le traitement des données**</center>

Le traitement des données, en particulier pour les grands jeux de données, nécessite des outils puissants. Dask et PySpark sont deux des outils les plus populaires de l'écosystème Python pour le traitement scalable des données. Dans ce notebook, nous allons explorer Dask, en vous fournissant des exemples pratiques et des réflexions sur quand et comment l'utiliser.

# **Requirements**

**Fichiers de données :** Aircraft_01.h5.zip, Aircraft_02.h5.zip, Aircraft_03.h5.zip, vol01.csv

# **Concepts de base**

### **1. Introduction**

**Qu'est-ce que Dask ?**

Dask est une bibliothèque de calcul parallèle flexible pour l'analyse des données, permettant aux utilisateurs d'exploiter toute la puissance de leur CPU et de leur mémoire sans avoir besoin d'un cluster.

**Caractéristiques principales :**

- Planification dynamique des tâches.
- S'intègre bien avec les bibliothèques Python populaires telles que Pandas et Numpy.
- Permet d'utiliser Dask DataFrame, qui est similaire à Pandas mais fonctionne sur des ensembles de données plus grands que la mémoire.

**Installer Dask :**

In [4]:
#!pip install dask[complete]

**Importer Dask :**

In [5]:
import dask.dataframe as dd

### **2. Dask Client et Ordonnanceurs (Schedulers)**

#### **Dask Client :**
Lorsque vous initialisez un `Client` sans arguments, cela met en place un cluster Dask local utilisant l'ordonnanceur distribué. Cela signifie que, même s'il est local (sur votre machine), il tire parti des avantages des capacités de calcul distribué de Dask. Voici deux scénarios différents d'utilisation de l'ordonnanceur distribué de Dask :
   
##### **Cluster Local (Machine unique) :**
Dans ce contexte, "distribué" ne signifie pas que les tâches sont réparties entre différentes machines, mais plutôt que les tâches peuvent être distribuées sur tous les cœurs ou threads de votre machine unique. Cela permet des fonctionnalités de parallélisme avancées, des calculs asynchrones et donne accès à un tableau de bord web pour des informations diagnostiques. C'est ce qui se passe lorsque vous utilisez `Client()` sans vous connecter à un cluster externe.

##### **Cluster Distribué (Plusieurs machines) :**
Dask peut être configuré pour fonctionner sur un cluster de plusieurs machines, où les calculs sont vraiment distribués sur différents nœuds du cluster. Ceci est utile pour des jeux de données très volumineux ou des tâches nécessitant beaucoup de calculs, où les ressources d'une seule machine seraient insuffisantes.

#### **Paramètres par défaut avec `Client()` :**
- **Ouvriers (Workers)** : Équivalent au nombre de cœurs physiques sur votre machine.
- **Threads par ouvrier** : 1 thread par défaut.
- **Limite de mémoire** : Une fraction de votre mémoire totale disponible est utilisée pour chaque ouvrier.

#### **Sans l'initialisation du Client :**
Dask utilise par défaut l'ordonnanceur à base de threads pour les opérations. Il s'agit d'un ordonnanceur interne utilisant un pool de threads de taille fixe. Cela signifie que vous n'utilisez pas les capacités distribuées, mais exécutez plutôt des calculs en parallèle en utilisant des threads sur votre machine locale.

#### **Ordonnanceurs :**
Dask prend en charge divers ordonnanceurs (à base de threads, multiprocessing, distribué). Le choix de l'ordonnanceur détermine comment Dask exécute les calculs parallèles. En utilisant un `Client`, vous optez pour l'ordonnanceur distribué, même si tous les calculs sont effectués uniquement sur votre machine locale.

En utilisant un `Client`, vous obtenez non seulement l'accès à l'ordonnanceur distribué, mais aussi à des outils supplémentaires comme le tableau de bord diagnostique de Dask, qui est utile pour surveiller et débuguer des calculs parallèles.

In [6]:
import os 
from dask.distributed import Client

num_cores = os.cpu_count()
print(f"Vous avez {num_cores} coeurs disponibles.")

# Créer un client Dask avec autant de workers que de cœurs
client = Client(n_workers=num_cores)

# OU spécifier d'autres paramètres
#client = Client(n_workers=num_cores, threads_per_worker=1, memory_limit="2GB")

print(client)

Vous avez 10 coeurs disponibles.
<Client: 'tcp://127.0.0.1:60474' processes=10 threads=10, memory=16.00 GiB>


### **3. Usage basique**

**Lire un fichier CSV :**

In [7]:
df = dd.read_csv('vol01.csv')

**Les opérations sont paresseuses (elles ne sont pas calculées immédiatement)**

In [8]:
result = df.groupby('ALT [ft]').mean()

**Calculer le résultat :**

In [9]:
result.compute()

Unnamed: 0_level_0,EGT_1 [deg C],EGT_2 [deg C],FMV_1 [mm],FMV_2 [mm],HPTACC_1 [%],HPTACC_2 [%],M [Mach],N1_1 [% rpm],N1_2 [% rpm],N2_1 [% rpm],...,VIB_AN1_1 [mils],VIB_AN1_2 [mils],VIB_AN2_1 [ips],VIB_AN2_2 [ips],VIB_BN1_1 [mils],VIB_BN1_2 [mils],VIB_BN2_1 [ips],VIB_BN2_2 [ips],VSV_1 [mm],VSV_2 [mm]
ALT [ft],Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
-23.183203,624.089720,646.255495,4.184419,4.167181,3.955259,34.638180,0.000000,23.638659,23.268110,54.129175,...,0.072908,0.000000,0.000000,0.255179,0.072908,0.000000,0.000000,0.255179,54.841671,54.904192
-18.546562,522.915270,629.356460,3.721907,4.281759,13.576883,34.436766,0.000000,20.930621,23.371085,47.964212,...,0.063935,0.007852,0.000000,0.215360,0.038137,0.007852,0.016264,0.235550,54.722880,54.625251
-13.909922,455.030296,604.080006,3.340158,4.318302,18.761545,34.371200,0.000000,18.924909,23.482675,42.733648,...,0.025414,0.004583,0.026664,0.180813,0.020831,0.002916,0.018331,0.218100,54.650832,54.309286
-9.273281,459.165128,515.484695,3.519823,3.805518,18.757023,33.914843,0.000797,20.859828,21.854212,43.861708,...,0.035070,0.009229,0.019842,0.157507,0.037223,0.017535,0.021534,0.259794,53.792965,53.728201
-4.636641,151.221978,167.811658,0.664343,0.761684,9.812895,34.463276,0.000818,7.567465,7.771416,12.989454,...,0.030615,0.011362,0.003314,0.039768,0.045765,0.019884,0.007101,0.082061,54.843205,54.842708
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
39420.717939,673.867921,673.967144,9.430265,9.448990,89.403080,98.857785,0.637469,101.819794,101.913563,80.391251,...,1.464987,0.163126,0.043011,0.014424,2.405977,1.091528,0.079989,0.127983,9.043635,8.363996
39425.354580,677.533867,678.365084,9.499597,9.533730,89.571474,98.847483,0.637688,102.516471,102.669763,80.599093,...,1.534494,0.150374,0.043859,0.015949,2.507365,1.172230,0.084300,0.118476,8.699101,8.066561
39429.991220,675.734082,676.188765,9.460387,9.497088,85.362855,98.886037,0.637105,102.185496,102.268745,80.431121,...,1.441343,0.151425,0.053279,0.014021,2.445236,1.144101,0.067300,0.123383,8.617530,7.927390
39434.627861,683.831762,686.364994,9.602262,9.625997,98.830620,98.864523,0.640898,103.247826,103.299922,80.862150,...,1.864372,0.114570,0.036454,0.036454,2.499717,1.156119,0.088532,0.130194,8.318321,7.797309


**Filtrage des données :**

In [10]:
filtered_data = df[df['ALT [ft]'] > 1000].compute()

**Jointure des données :**

In [11]:
df1 = df[df['ALT [ft]'] > 1000]
df2 = df[df['ALT [ft]'] < 1500]
joined_data = df1.merge(df2, on='ALT [ft]').compute()

In [12]:
joined_data

Unnamed: 0,ALT [ft],EGT_1 [deg C]_x,EGT_2 [deg C]_x,FMV_1 [mm]_x,FMV_2 [mm]_x,HPTACC_1 [%]_x,HPTACC_2 [%]_x,M [Mach]_x,N1_1 [% rpm]_x,N1_2 [% rpm]_x,...,VIB_AN1_1 [mils]_y,VIB_AN1_2 [mils]_y,VIB_AN2_1 [ips]_y,VIB_AN2_2 [ips]_y,VIB_BN1_1 [mils]_y,VIB_BN1_2 [mils]_y,VIB_BN2_1 [ips]_y,VIB_BN2_2 [ips]_y,VSV_1 [mm]_y,VSV_2 [mm]_y
0,1038.607483,816.685679,822.596553,13.613771,13.689681,98.881474,34.608516,0.205416,109.035445,108.88252,...,1.239443,0.072908,0.036454,0.036454,3.062153,1.895618,0.218725,0.182271,13.285793,14.202774
1,1080.337248,816.685679,822.596553,13.613771,13.689681,98.881474,34.549187,0.205416,108.964865,108.88252,...,1.239443,0.072908,0.036454,0.036454,3.062153,1.895618,0.218725,0.218725,13.285793,13.942268
2,1080.337248,816.685679,822.596553,13.613771,13.689681,98.881474,34.549187,0.205416,108.964865,108.88252,...,1.239443,0.364542,0.036454,0.072908,1.531076,0.656176,0.036454,0.182271,29.176644,27.290583
3,1080.337248,596.998217,600.938799,9.798227,9.852861,35.597331,35.597331,0.188983,71.980569,73.380419,...,1.239443,0.072908,0.036454,0.036454,3.062153,1.895618,0.218725,0.218725,13.285793,13.942268
4,1080.337248,596.998217,600.938799,9.798227,9.852861,35.597331,35.597331,0.188983,71.980569,73.380419,...,1.239443,0.364542,0.036454,0.072908,1.531076,0.656176,0.036454,0.182271,29.176644,27.290583
5,1108.157091,816.685679,822.596553,13.613771,13.689681,98.881474,34.608516,0.213633,108.964865,108.88252,...,1.166534,0.072908,0.036454,0.036454,3.062153,1.895618,0.182271,0.218725,13.160751,14.067311
6,1168.433418,816.685679,823.581698,13.613771,13.689681,98.822145,34.549187,0.213633,108.964865,108.964865,...,1.166534,0.072908,0.036454,0.036454,2.989244,1.895618,0.182271,0.218725,13.285793,14.004789
7,1210.163183,816.685679,823.581698,13.544608,13.689681,98.881474,34.549187,0.213633,108.964865,108.88252,...,1.166534,0.072908,0.036454,0.072908,2.989244,1.895618,0.182271,0.182271,13.223272,14.004789
8,1247.256307,814.715388,821.611407,13.475444,13.550582,98.762816,34.489858,0.213633,107.8591,108.223767,...,1.166534,0.072908,0.036454,0.072908,2.989244,1.895618,0.182271,0.218725,13.869326,14.723785
9,1307.532635,812.745097,818.65597,13.475444,13.550582,98.881474,34.608516,0.205416,107.635594,107.494433,...,1.093626,0.0,0.036454,0.072908,2.843428,1.82271,0.182271,0.218725,13.744283,14.463279


**Barre de progression avec le scheduler distribué :**

In [13]:
from dask.distributed import progress

# Lire CSV
df_dask = dd.read_csv('vol01.csv')

# Filter, GroupBy and Calculer la moyenne (mean)
result_dask = df_dask[df_dask['ALT [ft]'] > 1500].groupby('ALT [ft]')["M [Mach]"].mean()

# Calculer le résultat
result_dask = result_dask.persist()

progress(result_dask)

VBox()

**Barre de progression sur un scheduler à machine unique (Vous ne verrez pas la barre de progression si avez initialisez le Client) :**

In [15]:
from dask.diagnostics import ProgressBar

with ProgressBar():
    # Lire CSV
    df_dask = dd.read_csv('vol01.csv')

    # Filter, GroupBy and Calculer la moyenne (mean)
    result_dask = df_dask[df_dask['ALT [ft]'] > 1500].groupby('ALT [ft]')["M [Mach]"].mean()

    # Calculer le résultat
    result_dask = result_dask.persist()

### **4. Dask Delayed**

**Qu'est-ce que Dask Delayed ?**

Dask Delayed est un décorateur et une fonction. Il "retarde" l'exécution d'une fonction, au lieu de l'exécuter immédiatement. Lorsqu'une fonction est retardée, elle n'est pas encore calculée. Au lieu de cela, une représentation symbolique (ou tâche) du calcul est créée. Ces calculs symboliques peuvent ensuite être exécutés en parallèle.

**Comment ça marche?**

- Envelopper les calculs : En enveloppant les fonctions ou les calculs avec delayed, vous indiquez à Dask de ne pas les exécuter et de garder une trace des tâches pour un calcul parallèle ultérieur.

- Construire des graphes de tâches : Dask construit en interne un graphique de tâches à partir de ces opérations différées. Un graphique de tâches est une représentation visuelle de la séquence et des dépendances des opérations.

- Exécution parallèle : Une fois que votre calcul entier est construit en utilisant des fonctions retardées, vous pouvez calculer le résultat en parallèle en appelant la méthode `.compute()`.

**Pourquoi utiliser Delayed ?**

1. Algorithmes parallèles personnalisés : Bien que Dask fournisse des équivalents parallèles prêts à l'emploi pour de nombreuses opérations Python, Pandas et Numpy standard (comme Dask Array ou Dask DataFrame), vous pouvez rencontrer des scénarios dans lesquels vous avez besoin d'un parallélisme plus personnalisé. Dask Delayed vous permet de concevoir vos algorithmes parallèles sans avoir à vous plonger dans les complexités de la programmation parallèle.

2. Flexibilité : Vous pouvez combiner des collections Dask (comme Dask Array ou DataFrame) avec des fonctions retardées, ce qui offre un environnement flexible pour construire des pipelines complexes.

**Voyons un exemple:**

In [16]:
from dask import delayed

# Quelques fonctions simples
def inc(x):
    return x + 1

def add(x, y):
    return x + y

# Sans delayed
result = add(inc(1), inc(2))
print(result)  # Output: 5

5


**Maintenant, utilisons delayed:**

In [17]:
# Avec delayed
inc_delayed = delayed(inc)
add_delayed = delayed(add)

# Ils ne calculent pas encore le résultat, mais construisent le graphe de calcul.
a = inc_delayed(1)
b = inc_delayed(2)
result_delayed = add_delayed(a, b)

# Calcul en parallèle (dans ce cas simple, il n'y a pas grand-chose à paralléliser)
result = result_delayed.compute()
print(result)  # Output: 5

5


**Lorsque vous retardez une fonction (delayed), elle ne calcule pas immédiatement son résultat. Au lieu de cela, elle garde une trace du calcul dans un graphe de tâches, que vous pouvez visualiser:**

In [14]:
#conda install -c anaconda graphviz

In [15]:
#conda install -c conda-forge ipycytoscape

In [18]:
result_delayed.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

# **Exercices**

## **Exercice 1 : Extraction de fichiers Zip**

### **Objectif :** 
Écrire une fonction qui extrait tous les fichiers ZIP d'un répertoire source vers un répertoire de destination.

#### **Consignes :**

- La fonction doit être nommée extraire_fichiers_zip.
- La fonction doit accepter deux arguments :
    - repertoire_source : Le répertoire dans lequel rechercher les fichiers ZIP.
    - repertoire_destination : Le répertoire où les fichiers ZIP doivent être extraits.
- Si le répertoire de destination n'existe pas, la fonction doit le créer.
- Après l'extraction de chaque fichier ZIP, affichez un message informant l'utilisateur que le fichier a été extrait avec succès.

#### **Bonus :**

- Gérez les exceptions potentielles qui pourraient survenir lors de l'ouverture de fichiers ZIP corrompus.
- Créez une fonction de logs pour enregistrer chaque action réalisée (par exemple, la création d'un répertoire, l'extraction réussie d'un fichier, etc.).

#### **Astuces :**

- Utilisez le module os pour interagir avec le système de fichiers.
- Le module zipfile vous sera très utile pour gérer les fichiers ZIP.


Bonne chance !

In [2]:
import pandas as pd
import numpy as np
import zipfile, os
import dask.dataframe as dd

In [3]:
os.listdir()

['TD4.ipynb', '.DS_Store', 'mydask.html', 'data', 'vol01.csv']

In [28]:
pwd

'/Users/abanishbaskaradevan/Dossier Pages Sur Ordinateur/MACS 3/TD Statistique/MACS3-Statistiques-Descriptives-TDs/TD4'

In [20]:
def extraire_fichiers_zip(repertoire_source, repertoire_destination):
    """
    Extrait tous les fichiers zip du répertoire source vers le répertoire de destination.

    Paramètres:
    - repertoire_source (str): Le répertoire dans lequel rechercher les fichiers zip.
    - repertoire_destination (str): Le répertoire où les fichiers zip doivent être extraits.

    Retourne:
    - None
    """
    
    # Créer le répertoire de destination s'il n'existe pas
    if not os.path.exists(repertoire_destination):
        os.makedirs(repertoire_source)

    # Liste des fichiers dans le répertoire source
    fichiers = os.listdir(repertoire_source)

    # Parcourir tout les fichiers 
    for fichier in fichiers: 
        chemin_fichier_source=os.path.join(repertoire_source,fichier)

        # Vérifier si le fichier est un fichier ZIP
        if zipfile.is_zipfile(chemin_fichier_source):
            #Extrait le fichier ZIP
            with ZipFile(chemin_fichier_source,'r') as zip_ref:
                #Extrait tous les fichiers dans le répertoire source 
                zip_ref.extractal(repertoire_destination)

        # Affiche un message indiquant que le fichier a été extrait avec succès
        print(f"Le fichier {fichier} a été extrait avec succés dans {repertoire_destination}")
         
# Définir le répertoire où les fichiers zip sont stockés
#repertoire_source = 'data/data_zip'
repertoire_source ="data/data_dask"
# Définir le répertoire où les fichiers doivent être extraits

#repertoire_destination = 'data/data_extracted'
repertoire_destination = 'data/data_extracted'
# Print la liste des fichiers dans le répertoire source
print("Fichiers dans repertoire_source:", os.listdir(repertoire_source))

# Appelez la fonction pour extraire tous les fichiers zip du source vers le répertoire de destination
extraire_fichiers_zip(repertoire_source, repertoire_destination)

# Print la liste des fichiers dans le répertoire de destination après extraction
print("Fichiers dans repertoire_destination après extraction:", os.listdir(repertoire_destination))

Fichiers dans repertoire_source: ['Aircraft_01_dask.h5']
Le fichier Aircraft_01_dask.h5 a été extrait avec succés dans data/data_extracted
Fichiers dans repertoire_destination après extraction: ['Aircraft_01.h5', 'Aircraft_02.h5', 'Aircraft_03.h5']


## **Exercice 2 : Lecture d'un fichier HDF5 avec Dask**

Lisez le fichier HDF5 du premier avion "Aircraft 01" à l'aide de la fonction `read_hdf` de Dask.

In [25]:
nom_du_fichier = 'Aircraft_01.h5'
chemin_fichier = os.path.join(repertoire_destination, nom_du_fichier)
ddf = dd.read_hdf(chemin_fichier,'*')

TypeError: An error occurred while calling the read_hdf method registered to the pandas backend.
Original Message: 
This HDFStore is not partitionable and can only be use monolithically with
pandas.  In the future when creating HDFStores use the ``format='table'``
option to ensure that your dataset can be parallelized

### **L'Erreur**

Lors de la tentative de lecture d'un fichier HDF5 stocké au format fixe à l'aide de la fonction `read_hdf` de Dask, vous pourriez rencontrer l'erreur ci-dessus.
Cette erreur indique que le fichier HDF5 est dans un format qui ne lui permet pas d'être facilement partitionné pour un traitement parallèle par Dask.


### **Pourquoi?**

HDF5 est un format de stockage de données populaire capable de stocker de grands ensembles de données. Au sein du format HDF5, il existe deux principales façons de stocker des données :

1. **Format Fixe** : C'est le format de stockage par défaut dans lequel les données sont stockées sous forme de bloc monolithique.
2. **Format Tableau** : Dans ce format, les données sont stockées par morceaux, ce qui les rend essentiellement "partitionnées" et adaptées au traitement hors-mémoire et parallèle.

Dask est une bibliothèque de calcul parallèle qui permet des opérations efficaces sur de grands ensembles de données en travaillant simultanément sur de petits morceaux ou partitions. Lors de la tentative de lecture d'un fichier HDF5 avec Dask, si les données sont stockées au format fixe, cela peut conduire à des inefficacités et des erreurs potentielles car le fichier n'est pas intrinsèquement partitionné.



### **Objectif**

- Votre tâche est d'écrire une fonction capable de gérer cette erreur. La fonction devrait :

    1. Essayer de lire le fichier HDF5 à l'aide de Dask.
    2. S'il rencontre l'erreur mentionnée ci-dessus, il devrait créer une version partitionnée du fichier au format `tableau` et la stocker dans un dossier "data_dask".
    3. Une fois la version partitionnée créée, elle devrait lire cette version à l'aide de Dask et renvoyer le DataFrame Dask résultant.

### **Bonus**

- Améliorez la fonction pour vérifier d'abord si une version partitionnée du fichier existe déjà, et si c'est le cas, lisez cette version directement sans essayer de lire le fichier non partitionné.

### **Astuce**

- N'oubliez pas d'utiliser l'argument format='table' lors de la création d'une version partitionnée du fichier HDF5 avec pandas.

Bonne chance !


In [26]:
def lire_hdf_dask(nom_fichier, repertoire='data/data_extracted'):
    """
    Charge un fichier HDF5 dans un DataFrame Dask.
    
    Si le fichier HDF5 n'est pas partitionnable et qu'il n'y a pas déjà une version partitionnée,
    une version partitionnée sera créée dans le répertoire 'data/data_dask'.
    
    Paramètres:
    - nom_fichier (str): Nom du fichier HDF5.
    - repertoire (str, facultatif): Répertoire contenant le fichier HDF5. Par défaut, 'data/data_extracted'.

    Retour:
    - ddf: DataFrame Dask
    """

        #repertoire_dask='TD4'
    repertoire_dask='data/data_dask'
    # Vérifiez si le répertoire dask existe; sinon, créez le 
    if not os.path.exists(repertoire_dask):
        os.makedirs(repertoire_dask)

    fichier_partitionne=os.path.join(repertoire_dask,f"{nom_fichier[:-3]}_dask.h5")

    #Vérifier si une version partitionné du fichier existe
    if os.path.exists(fichier_partitionne):
        return dd.read_hdf(fichier_partitionne,'*')
    
    chemin_fichier=os.path.join(repertoire,nom_fichier)

    try: 
        return dd.read_hdf(chemin_fichier,'*')
    except TypeError:
        with pd.HDFStore(chemin_fichier) as store:
            keys_store=store.keys()
            if not keys_store:
                raise ValueError(f"Aucun jeu de données trouvé dans HDFStore : {nom_fichier}")
            
            print(f"Ce HDFStore n'est pas partitionnable et ne peut être utilisé de manière monolotique qu'avec pandas.")
            print(f"Création d'un nouveau fichier de données: {nom_fichier[:-3]}_dask.h5")

            #Créez une version partitionnée du fichier 
            with pd.HDFStore(fichier_partitionne,mode="w") as h:
                for key in keys_store:
                    h.put(key,store[key],format='table')
            print(f"Lecture fichier de données: '{nom_fichier}_dask.h5'")
            return dd.read_hdf(fichier_partitionne,'*')
    

# Testez la fonction
nom_du_fichier = 'Aircraft_01.h5'
ddf = lire_hdf_dask(nom_du_fichier)
print(f"DataFrame Dask du fichier {nom_du_fichier} a été chargé dans 'ddf'.")

DataFrame Dask du fichier Aircraft_01.h5 a été chargé dans 'ddf'.


In [27]:
ddf

Unnamed: 0_level_0,ALT [ft],EGT_1 [deg C],EGT_2 [deg C],FMV_1 [mm],FMV_2 [mm],HPTACC_1 [%],HPTACC_2 [%],M [Mach],N1_1 [% rpm],N1_2 [% rpm],N2_1 [% rpm],N2_2 [% rpm],NAIV_1 [bool],NAIV_2 [bool],P0_1 [psia],P0_2 [psia],PRV_1 [bool],PRV_2 [bool],PS3_1 [psia],PS3_2 [psia],PT2_1 [mbar],PT2_2 [mbar],P_OIL_1 [psi],P_OIL_2 [psi],Q_1 [lb/h],Q_2 [lb/h],T1_1 [deg C],T1_2 [deg C],T2_1 [deg C],T2_2 [deg C],T3_1 [deg C],T3_2 [deg C],T5_1 [deg C],T5_2 [deg C],TAT [deg C],TBV_1 [%],TBV_2 [%],TCASE_1 [deg C],TCASE_2 [deg C],TLA_1 [deg],TLA_2 [deg],T_OIL_1 [deg C],T_OIL_2 [deg C],VBV_1 [mm],VBV_2 [mm],VIB_AN1_1 [mils],VIB_AN1_2 [mils],VIB_AN2_1 [ips],VIB_AN2_2 [ips],VIB_BN1_1 [mils],VIB_BN1_2 [mils],VIB_BN2_1 [ips],VIB_BN2_2 [ips],VSV_1 [mm],VSV_2 [mm]
npartitions=1001,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


## **Exercice 3 : Exploration de Dask DataFrame**

Dans cet exercice, vous allez explorer les caractéristiques et les propriétés de base d'un Dask DataFrame (ddf). L'objectif est de vous familiariser avec les composants clés et les méthodes associées à un Dask DataFrame.

### **Objectifs :**

1. **Nombre de Partitions** : 
   - Déterminez combien de partitions sont présentes dans le Dask DataFrame `ddf`.

2. **Noms des Indices** : 
   - Utilisez la méthode `map_partitions` pour obtenir le nom des indices de chaque partition de `ddf`.
   - Affichez les 10 premiers noms d'index.
   - Comptez le nombre total de noms d'index que vous avez récupérés.

3. **Colonnes du DataFrame** :
   - Récupérez toutes les colonnes de `ddf` et stockez-les dans une variable.
   - Comptez le nombre de colonnes.

4. **Types de Données** :
   - Examinez les types de données de chaque colonne dans `ddf`.

5. **Aperçu d'une Partition** :
   - Affichez les premières lignes de la première partition de `ddf`.

### **Astuces :**

- La méthode `map_partitions` peut être utilisée pour appliquer une fonction à chaque partition d'un Dask DataFrame.
- La méthode `compute()` force l'évaluation d'une opération Dask. Souvenez-vous que Dask est paresseux par défaut, c'est-à-dire qu'il ne calcule pas les résultats tant que vous ne le lui demandez pas.
- Vous pouvez accéder à une partition spécifique en utilisant `ddf.partitions[index]`.

### **Bonus :**

- Essayez d'afficher les dernières lignes de la dernière partition de `ddf`.
- Explorez d'autres méthodes et propriétés associées à `ddf` pour obtenir une meilleure compréhension de vos données.

Bonne chance !

**1. Nombre de Partitions :**

In [28]:
ddf.npartitions

1001

**2. Noms des Indices :**

In [29]:
indexes=ddf.map_partitions(lambda df:df.index.name)

In [30]:
indexes_names=indexes.compute()
indexes_names[:10]

0    record_00
1    record_01
2    record_02
3    record_03
4    record_04
5    record_05
6    record_06
7    record_07
8    record_08
9    record_09
dtype: object

In [31]:
len(indexes_names)

1001

**3. Colonnes du DataFrame :**

In [32]:
ddf.columns

Index(['ALT [ft]', 'EGT_1 [deg C]', 'EGT_2 [deg C]', 'FMV_1 [mm]',
       'FMV_2 [mm]', 'HPTACC_1 [%]', 'HPTACC_2 [%]', 'M [Mach]',
       'N1_1 [% rpm]', 'N1_2 [% rpm]', 'N2_1 [% rpm]', 'N2_2 [% rpm]',
       'NAIV_1 [bool]', 'NAIV_2 [bool]', 'P0_1 [psia]', 'P0_2 [psia]',
       'PRV_1 [bool]', 'PRV_2 [bool]', 'PS3_1 [psia]', 'PS3_2 [psia]',
       'PT2_1 [mbar]', 'PT2_2 [mbar]', 'P_OIL_1 [psi]', 'P_OIL_2 [psi]',
       'Q_1 [lb/h]', 'Q_2 [lb/h]', 'T1_1 [deg C]', 'T1_2 [deg C]',
       'T2_1 [deg C]', 'T2_2 [deg C]', 'T3_1 [deg C]', 'T3_2 [deg C]',
       'T5_1 [deg C]', 'T5_2 [deg C]', 'TAT [deg C]', 'TBV_1 [%]', 'TBV_2 [%]',
       'TCASE_1 [deg C]', 'TCASE_2 [deg C]', 'TLA_1 [deg]', 'TLA_2 [deg]',
       'T_OIL_1 [deg C]', 'T_OIL_2 [deg C]', 'VBV_1 [mm]', 'VBV_2 [mm]',
       'VIB_AN1_1 [mils]', 'VIB_AN1_2 [mils]', 'VIB_AN2_1 [ips]',
       'VIB_AN2_2 [ips]', 'VIB_BN1_1 [mils]', 'VIB_BN1_2 [mils]',
       'VIB_BN2_1 [ips]', 'VIB_BN2_2 [ips]', 'VSV_1 [mm]', 'VSV_2 [mm]'],
      dtyp

In [33]:
len(ddf.columns)

55

**4. Types de Données :**

In [34]:
ddf.dtypes

ALT [ft]            float64
EGT_1 [deg C]       float64
EGT_2 [deg C]       float64
FMV_1 [mm]          float64
FMV_2 [mm]          float64
HPTACC_1 [%]        float64
HPTACC_2 [%]        float64
M [Mach]            float64
N1_1 [% rpm]        float64
N1_2 [% rpm]        float64
N2_1 [% rpm]        float64
N2_2 [% rpm]        float64
NAIV_1 [bool]       float64
NAIV_2 [bool]       float64
P0_1 [psia]         float64
P0_2 [psia]         float64
PRV_1 [bool]        float64
PRV_2 [bool]        float64
PS3_1 [psia]        float64
PS3_2 [psia]        float64
PT2_1 [mbar]        float64
PT2_2 [mbar]        float64
P_OIL_1 [psi]       float64
P_OIL_2 [psi]       float64
Q_1 [lb/h]          float64
Q_2 [lb/h]          float64
T1_1 [deg C]        float64
T1_2 [deg C]        float64
T2_1 [deg C]        float64
T2_2 [deg C]        float64
T3_1 [deg C]        float64
T3_2 [deg C]        float64
T5_1 [deg C]        float64
T5_2 [deg C]        float64
TAT [deg C]         float64
TBV_1 [%]           

**5. Aperçu d'une Partition :**

In [35]:
ddf.partitions[1].head()

Unnamed: 0_level_0,ALT [ft],EGT_1 [deg C],EGT_2 [deg C],FMV_1 [mm],FMV_2 [mm],HPTACC_1 [%],HPTACC_2 [%],M [Mach],N1_1 [% rpm],N1_2 [% rpm],...,VIB_AN1_1 [mils],VIB_AN1_2 [mils],VIB_AN2_1 [ips],VIB_AN2_2 [ips],VIB_BN1_1 [mils],VIB_BN1_2 [mils],VIB_BN2_1 [ips],VIB_BN2_2 [ips],VSV_1 [mm],VSV_2 [mm]
record_01,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,194.738903,64.034462,64.034462,-0.726222,-0.730271,34.420641,34.737062,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.665211,56.59227
1,194.738903,64.034462,64.034462,-0.726222,-0.730271,34.420641,34.737062,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.665211,56.59227
2,194.738903,64.034462,64.034462,-0.726222,-0.730271,34.420641,34.796391,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.665211,56.59227
3,194.738903,64.034462,64.034462,-0.726222,-0.730271,34.420641,34.796391,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.665211,56.59227
4,194.738903,64.034462,64.034462,-0.726222,-0.730271,34.489858,34.796391,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,56.665211,56.59227


## **Exercice 4 : Manipulation avancée avec Dask et comparaison avec Pandas**

Après avoir exploré les bases de `Dask DataFrame`, poursuivons avec des opérations plus avancées. L'objectif est de comprendre et de comparer le comportement de Dask par rapport à Pandas, notamment en termes de performances.

### **Objectifs :**

1. **Maximum de la colonne Vitesse** :
   - Utilisez `map_partitions` pour trouver la vitesse maximale (`SPEED`) de chaque partition dans `ddf`.
   - Calculez le temps d'exécution avec `%time`.

2. **Extraction de la ligne avec la vitesse maximale** :
   - Sélectionnez une partition spécifique de `ddf` (par exemple, la 676ème partition) et trouvez la ligne ayant la vitesse maximale.
   - Utilisez une fonction personnalisée pour obtenir la ligne avec la vitesse maximale pour chaque partition.

3. **Comparaison avec Pandas** :
   - Lisez le fichier HDF5 du premier avion avec Pandas et obtenez la liste des clés.
   - Comptez le nombre d'enregistrements et vérifiez les DataFrames vides.
   - Supprimez les enregistrements vides.
   - Mesurez le temps d'exécution pour trouver la ligne avec la vitesse maximale pour chaque DataFrame dans votre fichier avec Pandas.

### **Astuces :**

- La méthode `idxmax()` donne l'index de la première occurrence de la valeur maximale.
- Vous pouvez mesurer le temps d'exécution d'une cellule dans un notebook Jupyter avec la commande magique `%time`.

### **Bonus :**

- Essayez d'optimiser vos fonctions pour réduire le temps d'exécution.
- Comparez les résultats obtenus avec Dask et Pandas pour vérifier leur cohérence.
- Réfléchissez à des scénarios où l'utilisation de Dask serait préférable à celle de Pandas et vice-versa.

Bonne chance !

1. **Maximum de la colonne Vitesse :** 

In [48]:
V_max=ddf.map_partitions(lambda df:df['M [Mach]'].max())
print(V_max.compute())
%time

0       0.640898
1       0.649114
2       0.624464
3       0.632681
4       0.599814
          ...   
996     0.583381
997     0.640898
998     0.624464
999     0.632681
1000    0.632681
Length: 1001, dtype: float64
CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.25 µs


2. **Extraction de la ligne avec la vitesse maximale :**

In [49]:
ddf.map_partitions(lambda df:df['M [Mach]'].idmax())

ValueError: Metadata inference failed in `lambda`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
AttributeError("'Series' object has no attribute 'idmax'")

Traceback:
---------
  File "/Users/abanishbaskaradevan/anaconda3/lib/python3.11/site-packages/dask/dataframe/utils.py", line 193, in raise_on_meta_error
    yield
  File "/Users/abanishbaskaradevan/anaconda3/lib/python3.11/site-packages/dask/dataframe/core.py", line 6876, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/fl/71zh2c117r9f2nwvtqmpm3nm0000gn/T/ipykernel_9219/1108032554.py", line 1, in <lambda>
    ddf.map_partitions(lambda df:df['M [Mach]'].idmax())
                                 ^^^^^^^^^^^^^^^^^^^^
  File "/Users/abanishbaskaradevan/anaconda3/lib/python3.11/site-packages/pandas/core/generic.py", line 6204, in __getattr__
    return object.__getattribute__(self, name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


In [None]:
...

3. **Comparaison avec Pandas :**

In [None]:
# !pip install tables
...

In [None]:
...

In [None]:
...

In [None]:
...

In [None]:
# Il est obligatoire d'éliminer les vols qui n'ont pas de CEOD pour faire le calcul sans rencontrer d'erreurs (contrairement à DASK qui les éliminent automatiquement)
...

In [None]:
...

In [None]:
######################## Avec Tabata ########################
#############################################################

...

## **Exercice 5 : Analyse et Visualisation des Données avec Dask et Plotly**

Dans cette section, nous allons explorer davantage les fonctionnalités offertes par `Dask DataFrame` pour le traitement des données et nous plonger dans la visualisation à l'aide de `matplotlib` et `plotly`.

### **Objectifs :**

1. **Suppression des colonnes** :
   - L'avion est équipé de deux moteurs. Supprimez les colonnes du moteur 2 à l'aide d'une liste prédéfinie.
   - Vérifiez les colonnes restantes après suppression.

2. **Visualisation de séries temporelles** :
   - Définissez une fonction pour visualiser les séries temporelles de colonnes spécifiques d'une partition spécifique.
   - Testez cette fonction en visualisant certaines séries temporelles.

3. **Calcul de la longueur des vols** :
   - Calculez la longueur de chaque vol (nombre d'enregistrements) dans `ddf`.
   - Identifiez le vol ayant la durée maximale et minimale.

4. **Visualisation de la distribution de la longueur des vols** :
   - Utilisez `plotly` pour afficher la distribution de la longueur des vols à l'aide d'un graphique en boîte.

### **Astuces :**

- La méthode `drop` permet de supprimer des colonnes ou des lignes d'un DataFrame Dask.
- Utilisez `map_partitions` pour appliquer des fonctions à chaque partition de `ddf`.
- `plotly` est une bibliothèque de visualisation interactive qui peut être utilisée pour afficher une variété de types de graphiques.

### **Bonus :**

- Essayez d'autres types de visualisations avec `plotly` pour explorer davantage les données.
- Réfléchissez à des techniques pour optimiser le traitement et la visualisation des données à l'aide de Dask.
- Comparez les temps d'exécution entre différentes opérations pour mieux comprendre les avantages et inconvénients de Dask.

Bonne chance !


**1. Suppression des colonnes :**

In [None]:
list_var_e2 = [
                   'FMV_2 [mm]', 'HPTACC_2 [%]', 
                   'N1_2 [% rpm]','N2_2 [% rpm]',
                   'NAIV_2 [bool]', 'P0_2 [psia]',
                   'PRV_2 [bool]', 'PS3_2 [psia]',
                   'PT2_2 [mbar]', 'P_OIL_2 [psi]',
                   'Q_2 [lb/h]', 'T1_2 [deg C]',
                   'T2_2 [deg C]', 'T3_2 [deg C]',
                   'T5_2 [deg C]', 'TBV_2 [%]',
                   'TCASE_2 [deg C]', 'TLA_2 [deg]',
                   'T_OIL_2 [deg C]', 'VBV_2 [mm]',
                   'EGT_2 [deg C]','VIB_AN1_2 [mils]',
                   'VIB_AN2_2 [ips]', 'VIB_BN1_2 [mils]',
                   'VIB_BN2_2 [ips]', 'VSV_2 [mm]'
                ]

In [None]:
...

In [None]:
...

**2. Visualisation de séries temporelles :**

In [None]:
import matplotlib.pyplot as plt 

def tracer_serie_temporelle(ddf, numero_partition, colonnes):
    """
    Trace la série temporelle pour les colonnes spécifiées à partir d'une table donnée dans le fichier HDF5.
    
    Paramètres:
    - ddf : dask dataframe
    - numero_partition: Le numéro de la table à charger à partir du fichier HDF5.
    - colonnes: Une liste de colonnes à tracer.
    """

    ...


# Testing the function 
tracer_serie_temporelle(ddf_e1, 0, ['ALT [ft]', 'EGT_1 [deg C]', 'FMV_1 [mm]', 'HPTACC_1 [%]', 'M [Mach]'])

**3. Calcul de la longueur des vols :**

In [None]:
...

In [None]:
...

print(f"{vol_avec_longueur_max} a la plus grande longueur de vol qui est =", longueurs_vols[vol_avec_longueur_max])
print(f"{vol_avec_longueur_min} a la plus petite longueur de vol qui est =", longueurs_vols[vol_avec_longueur_min])

4. **Visualisation de la distribution de la longueur des vols :**

In [None]:
import plotly.express as px
...