In [11]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialiser la session Spark 
spark = SparkSession.builder.appName("ImportForSQLPrep").getOrCreate()

# 1. Charger le fichier CSV 
# Spark lira les donn√©es et attribuera des noms de colonnes g√©n√©riques (_c0, _c1, ...)
df_spark = spark.read.csv("flights_2020.csv", header=False, inferSchema=True)

# 2. D√©finir les noms de colonnes propres et compatibles SQL
# Ces noms sont ceux que nous utiliserons ensuite dans nos requ√™tes SQL
sql_compliant_column_names = [
    'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'OP_UNIQUE_CARRIER', 
    'ORIGIN_CITY_NAME', 'ORIGIN_STATE_ABR', 'DEST_CITY_NAME', 'DEST_STATE_ABR', 
    'CRS_DEP_TIME', 'DEP_DELAY_NEW', 'CRS_ARR_TIME', 'ARR_DELAY_NEW', 
    'CANCELLED', 'CANCELLATION_CODE', 'AIR_TIME', 'DISTANCE'
]

# 3. Renommer toutes les colonnes en une seule op√©ration avec Spark
df_spark = df_spark.toDF(*sql_compliant_column_names)

# 4. Afficher le sch√©ma pour v√©rifier que tout est propre
print("Sch√©ma final compatible SQL :")
df_spark.printSchema()

# 5. Afficher les donn√©es pour validation 
df_spark.show(5)


Sch√©ma final compatible SQL :
root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)

+----+-----+------------+-----------+-----------------+----------------+----------------+--------------+--------------+------------+-------------+------------+-------------+---------+-----------------+--------+--

**Affichage Clair des Statistiques Descriptives**

In [3]:
print("--- Statistiques Descriptives pour toutes les colonnes (Affichage Vertical) ---")

# Calculer les statistiques descriptives
df_described = df_spark.describe()

# Afficher le r√©sultat avec l'option vertical=True pour afficher les statistiques par ligne
# Au lieu de 'df_described.show()', utilisez:
df_described.show(vertical=True, truncate=False)

--- Statistiques Descriptives pour toutes les colonnes (Affichage Vertical) ---
-RECORD 0--------------------------------
 summary           | count               
 YEAR              | 4316997             
 MONTH             | 4316997             
 DAY_OF_MONTH      | 4316997             
 DAY_OF_WEEK       | 4316997             
 OP_UNIQUE_CARRIER | 4316997             
 ORIGIN_CITY_NAME  | 4316997             
 ORIGIN_STATE_ABR  | 4316997             
 DEST_CITY_NAME    | 4316997             
 DEST_STATE_ABR    | 4316997             
 CRS_DEP_TIME      | 4316997             
 DEP_DELAY_NEW     | 4316997             
 CRS_ARR_TIME      | 4316997             
 ARR_DELAY_NEW     | 4316997             
 CANCELLED         | 4316997             
 CANCELLATION_CODE | 4316997             
 AIR_TIME          | 4316997             
 DISTANCE          | 4316997             
-RECORD 1--------------------------------
 summary           | mean                
 YEAR              | 2020.0           

**Affichage Ligne par Ligne des Valeurs Manquantes**

In [4]:
from pyspark.sql.functions import col, sum
import pandas as pd

# 1. Calcul des valeurs manquantes 
# Cette liste cr√©e les expressions d'agr√©gation pour toutes les colonnes
df_columns = df_spark.columns
null_counts_expressions = [sum(col(c).isNull().cast("int")).alias(f"null_count_{c}") for c in df_columns]

# Ex√©cuter l'agr√©gation
df_null_summary = df_spark.agg(*null_counts_expressions)

# 2. Conversion et Transposition pour un affichage clair

# Convertir la ligne de r√©sultat Spark en un DataFrame Pandas (plus facile √† manipuler en Python)
# Le .collect()[0] r√©cup√®re l'unique ligne de r√©sultat
null_counts_row = df_null_summary.collect()[0].asDict()

# Transposer le dictionnaire pour obtenir une colonne de Noms et une colonne de Comptes
# Exemple: {'null_count_YEAR': 0, 'null_count_MONTH': 0, ...}
# devient: [('YEAR', 0), ('MONTH', 0), ...]

results = []
for key, value in null_counts_row.items():
    # Nettoyer le nom de la colonne pour l'affichage (ex: enlever 'null_count_')
    clean_column_name = key.replace("null_count_", "")
    results.append((clean_column_name, value))

# 3. Affichage Ligne par Ligne des R√©sultats

print("--- üîç Nombre de Valeurs Manquantes par Variable ---")
print("{:<20} {:>10}".format("VARIABLE", "COUNT NULLS"))
print("-" * 33)

# Afficher chaque paire (nom de colonne, compte de NULLs) sur une ligne
for column, count in results:
    print("{:<20} {:>10}".format(column, count))

--- üîç Nombre de Valeurs Manquantes par Variable ---
VARIABLE             COUNT NULLS
---------------------------------
YEAR                          0
MONTH                         0
DAY_OF_MONTH                  0
DAY_OF_WEEK                   0
OP_UNIQUE_CARRIER             0
ORIGIN_CITY_NAME              0
ORIGIN_STATE_ABR              0
DEST_CITY_NAME                0
DEST_STATE_ABR                0
CRS_DEP_TIME                  0
DEP_DELAY_NEW                 0
CRS_ARR_TIME                  0
ARR_DELAY_NEW                 0
CANCELLED                     0
CANCELLATION_CODE             0
AIR_TIME                      0
DISTANCE                      0


**Traitement des Doublons (Duplicata)**

In [5]:
# 1. Obtenir le nombre total de lignes
total_rows = df_spark.count()

# 2. Obtenir le nombre de lignes uniques
distinct_rows = df_spark.distinct().count()

# 3. Calculer le nombre de doublons
num_duplicates = total_rows - distinct_rows

print(f"--- üîé V√©rification des Doublons ---")
print(f"Nombre total de lignes : {total_rows:,}")
print(f"Nombre de lignes distinctes : {distinct_rows:,}")
print(f"Nombre de doublons exacts trouv√©s : {num_duplicates:,}")

--- üîé V√©rification des Doublons ---
Nombre total de lignes : 4,316,997
Nombre de lignes distinctes : 4,316,977
Nombre de doublons exacts trouv√©s : 20


In [6]:
# Suppression des Doublons:
if num_duplicates > 0:
    # Cr√©er un nouveau DataFrame propre sans les doublons
    df_spark_cleaned = df_spark.dropDuplicates()
    
    # V√©rification (Action)
    print(f"\n--- ‚úÖ Suppression effectu√©e ---")
    print(f"Nouveau nombre de lignes (sans doublons) : {df_spark_cleaned.count():,}")
else:
    print("\n‚úÖ Aucune ligne dupliqu√©e trouv√©e. Le DataFrame est d√©j√† propre.")
    df_spark_cleaned = df_spark


--- ‚úÖ Suppression effectu√©e ---
Nouveau nombre de lignes (sans doublons) : 4,316,977


**G√©rer les types incorrects**

In [9]:
# Nous allons utiliser la fonction substring pour extraire les deux premiers caract√®res (l'heure) et cast pour 
# les convertir en IntegerType (concernant les 2 variables CRS_DEP_TIME + CRS_ARR_TIME).

from pyspark.sql.functions import col, substring

# Utiliser le DataFrame nettoy√© (apr√®s gestion des doublons et CANCELLATION_CODE)
df_temp = df_spark_cleaned 

# Cr√©er une nouvelle colonne pour l'heure de d√©part pr√©vue (HH)
df_temp = df_temp.withColumn(
    "CRS_DEP_HOUR", 
    substring(col("CRS_DEP_TIME"), 1, 2).cast("int")
)

# Cr√©er une nouvelle colonne pour l'heure d'arriv√©e pr√©vue (HH)
df_temp = df_temp.withColumn(
    "CRS_ARR_HOUR", 
    substring(col("CRS_ARR_TIME"), 1, 2).cast("int")
)

# Remplacer le DataFrame nettoy√© final par le DataFrame avec les nouvelles colonnes d'heure
df_spark_cleaned = df_temp

# V√©rification (Action)
print("--- üìù Sch√©ma apr√®s transformation des heures ---")
df_spark_cleaned.printSchema()

print("\n--- üßê Aper√ßu des nouvelles colonnes ---")
df_spark_cleaned.select("CRS_DEP_TIME", "CRS_DEP_HOUR", "CRS_ARR_TIME", "CRS_ARR_HOUR").limit(5).show()

--- üìù Sch√©ma apr√®s transformation des heures ---
root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- CRS_DEP_HOUR: integer (nullable = true)
 |-- CRS_ARR_HOUR: integer (nullable = true)


--- üßê Aper√ßu des nouvelles colonnes ---
+------------+------------+------------+------------+


In [15]:
# Corriger le Type de DEP_DELAY_NEW (il etait String --> Integer):
from pyspark.sql.functions import col, when

# 1. Remplacer la cha√Æne "NULL" par une vraie valeur null, puis caster en IntegerType
df_spark_cleaned = df_spark_cleaned.withColumn(
    "DEP_DELAY_NEW_CLEAN",
    when(col("DEP_DELAY_NEW").cast("string") == "NULL", None)
    .otherwise(col("DEP_DELAY_NEW"))
    .cast("integer")
)

# 2. V√©rification: Afficher le sch√©ma pour confirmer le type 'integer'
print("--- üìù Sch√©ma apr√®s correction de DEP_DELAY_NEW ---")
df_spark_cleaned.printSchema()

--- üìù Sch√©ma apr√®s correction de DEP_DELAY_NEW ---
root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- CRS_DEP_HOUR: integer (nullable = true)
 |-- CRS_ARR_HOUR: integer (nullable = true)
 |-- DEP_DELAY_NEW_CLEAN: integer (nullable = true)



**Transformations et actions (Analyse du Retard Moyen):**

> 1. Transformation:

In [18]:
from pyspark.sql.functions import col, avg, round, count
# Assurez-vous d'utiliser le DataFrame avec toutes les √©tapes de nettoyage (nettoyage CANCELLATION_CODE et conversion des Timestamps)
df_analysis = df_spark_cleaned

In [19]:
# Transformation 1 : Filter - Isoler les vols en retard (retard > 0)
df_delays = df_analysis.filter(col("DEP_DELAY_NEW_CLEAN") > 0)

# Transformation 2 : Select - R√©duire les colonnes √† analyser
df_selected = df_delays.select("OP_UNIQUE_CARRIER", "DAY_OF_WEEK", "DEP_DELAY_NEW_CLEAN")

# Transformation 3 & 4 : GroupBy et Agr√©gation (Calcul du Retard Moyen)
df_carrier_day_delay = df_selected.groupBy("OP_UNIQUE_CARRIER", "DAY_OF_WEEK").agg(
    round(avg(col("DEP_DELAY_NEW_CLEAN")), 2).alias("AVG_DEP_DELAY_MINUTES"),
    count(col("DEP_DELAY_NEW_CLEAN")).alias("TOTAL_DELAYED_FLIGHTS")
)

# Transformation 5 : OrderBy - Trier les r√©sultats (par les retards les plus √©lev√©s)
df_result = df_carrier_day_delay.orderBy(col("AVG_DEP_DELAY_MINUTES").desc())

> 2. Actions

In [20]:
# Action 1 : Show - Afficher les 10 pires combinaisons (Retard Moyen le plus √©lev√©)
print("--- ü•á Top 10 des Pires Retards Moyens (Transporteur/Jour) ---")
df_result.show(10)

--- ü•á Top 10 des Pires Retards Moyens (Transporteur/Jour) ---
+-----------------+-----------+---------------------+---------------------+
|OP_UNIQUE_CARRIER|DAY_OF_WEEK|AVG_DEP_DELAY_MINUTES|TOTAL_DELAYED_FLIGHTS|
+-----------------+-----------+---------------------+---------------------+
|               YV|          4|                62.82|                 3542|
|               G4|          7|                59.79|                 4068|
|               OO|          6|                55.01|                10224|
|               OO|          1|                 54.9|                12750|
|               OO|          5|                54.46|                13288|
|               OO|          4|                54.29|                12299|
|               OH|          4|                54.22|                 5708|
|               YV|          1|                53.72|                 3028|
|               OO|          2|                53.71|                10346|
|               9E|    

In [21]:
# Action 2 : Count - Compter le nombre de lignes du r√©sultat final
total_combinations = df_result.count()
print(f"\nNombre total de combinaisons (Transporteur/Jour de la semaine) analys√©es : {total_combinations}")



Nombre total de combinaisons (Transporteur/Jour de la semaine) analys√©es : 119


In [27]:
# Action 3 : Take - R√©cup√©rer les 5 premi√®res lignes du r√©sultat dans une structure Python
print(f"\n--- ü§è Action 3: Take (Afficher les 5 premiers r√©sultats comme objet Python) ---")
top_5_rows = df_result.take(5)

# Affichage des 5 lignes r√©cup√©r√©es via l'action take
for row in top_5_rows:
    print(f"Compagnie: {row['OP_UNIQUE_CARRIER']} | Jour: {row['DAY_OF_WEEK']} | Retard Moyen: {row['AVG_DEP_DELAY_MINUTES']} min")

print("\n‚úÖ Les 3 actions (show, count, take) sont compl√©t√©es.")


--- ü§è Action 3: Take (Afficher les 5 premiers r√©sultats comme objet Python) ---
Compagnie: YV | Jour: 4 | Retard Moyen: 62.82 min
Compagnie: G4 | Jour: 7 | Retard Moyen: 59.79 min
Compagnie: OO | Jour: 6 | Retard Moyen: 55.01 min
Compagnie: OO | Jour: 1 | Retard Moyen: 54.9 min
Compagnie: OO | Jour: 5 | Retard Moyen: 54.46 min

‚úÖ Les 3 actions (show, count, take) sont compl√©t√©es.


**Analyse du dataset**

> Analyse 1 : Performance Globale des Transporteurs (Q1)

In [28]:
from pyspark.sql.functions import col, avg, round, sum, desc

# 1. Calcul du Taux d'Annulation (Fiabilit√©)
# On compte le nombre total de vols annul√©s (CANCELLED = 1) par transporteur
df_cancellation_rate = df_analysis.groupBy("OP_UNIQUE_CARRIER").agg(
    round(avg(col("CANCELLED")) * 100, 2).alias("TAUX_ANNULATION_POURCENT"),
    sum(col("CANCELLED")).alias("TOTAL_ANNULATIONS"),
    count(col("OP_UNIQUE_CARRIER")).alias("TOTAL_VOLS")
).orderBy(col("TAUX_ANNULATION_POURCENT").asc()) # Tri croissant (Moins d'annulations = Meilleur)

print("--- ü•á TOP 5 des Transporteurs les plus FIABLES (Taux d'annulation le plus bas) ---")
df_cancellation_rate.show(5)

# 2. Calcul du Retard Moyen (Moins de fiabilit√©)
# Utilise DEP_DELAY_NEW_CLEAN (retards > 0 uniquement)
df_delay_rate = df_analysis.filter(col("DEP_DELAY_NEW_CLEAN") > 0).groupBy("OP_UNIQUE_CARRIER").agg(
    round(avg(col("DEP_DELAY_NEW_CLEAN")), 2).alias("RETARD_MOYEN_MINUTES"),
    count(col("DEP_DELAY_NEW_CLEAN")).alias("TOTAL_VOLS_EN_RETARD")
).orderBy(col("RETARD_MOYEN_MINUTES").desc()) # Tri d√©croissant (Retard le plus √©lev√© = Moins Bon)

print("--- üëé TOP 5 des Transporteurs les moins FIABLES (Retard Moyen le plus haut) ---")
df_delay_rate.show(5)

--- ü•á TOP 5 des Transporteurs les plus FIABLES (Taux d'annulation le plus bas) ---
+-----------------+------------------------+-----------------+----------+
|OP_UNIQUE_CARRIER|TAUX_ANNULATION_POURCENT|TOTAL_ANNULATIONS|TOTAL_VOLS|
+-----------------+------------------------+-----------------+----------+
|               NK|                    2.31|             2828|    122357|
|               9E|                    3.58|             6776|    189388|
|               AS|                    4.18|             5317|    127218|
|               OO|                     4.5|            24416|    542338|
|               YX|                    5.14|            10207|    198494|
+-----------------+------------------------+-----------------+----------+
only showing top 5 rows
--- üëé TOP 5 des Transporteurs les moins FIABLES (Retard Moyen le plus haut) ---
+-----------------+--------------------+--------------------+
|OP_UNIQUE_CARRIER|RETARD_MOYEN_MINUTES|TOTAL_VOLS_EN_RETARD|
+----------------

> Analyse 2 : Distribution du Retard par Heure de D√©part (Q2)

In [31]:
#Resolution de probleme de valeur NULL sur colonne "CRS_DEP_TIME"

from pyspark.sql.functions import col, concat, lpad, to_timestamp, lit, hour, when

# 1. Nettoyer la colonne de temps (CRS_DEP_TIME) pour s'assurer que "NULL" est un vrai null
df_analysis = df_analysis.withColumn(
    "CRS_DEP_TIME_CLEAN",
    when(col("CRS_DEP_TIME") == "NULL", None) # G√©rer la cha√Æne "NULL"
    .otherwise(col("CRS_DEP_TIME"))
)

# 2. Cr√©ation de la cha√Æne de date compl√®te (n√©cessaire pour le timestamp)
df_analysis = df_analysis.withColumn(
    "FLIGHT_DATE_STR", 
    concat(
        col("YEAR"), lit("-"), 
        lpad(col("MONTH"), 2, "0"), lit("-"),  
        lpad(col("DAY_OF_MONTH"), 2, "0")
    )
)

# 3. Cr√©ation du Timestamp de D√©part Pr√©vu (CRS_DEP_TS) en utilisant le temps nettoy√©
df_analysis = df_analysis.withColumn(
    "CRS_DEP_TS_CLEAN",
    to_timestamp(
        concat(col("FLIGHT_DATE_STR"), lit(" "), col("CRS_DEP_TIME_CLEAN")), 
        "yyyy-MM-dd HH:mm:ss"
    )
)

# 4. Cr√©er la Colonne d'Heure Propre (CRS_DEP_HOUR_CLEAN) √† partir du Timestamp
df_analysis = df_analysis.withColumn(
    "CRS_DEP_HOUR_CLEAN", 
    hour(col("CRS_DEP_TS_CLEAN"))
)

# Mettre √† jour le DataFrame final
df_spark_final_cleaned = df_analysis

print("--- üìù Sch√©ma et Aper√ßu apr√®s nettoyage complet des heures ---")
df_spark_final_cleaned.printSchema()
df_spark_final_cleaned.select("CRS_DEP_TIME", "CRS_DEP_TIME_CLEAN", "CRS_DEP_TS_CLEAN", "CRS_DEP_HOUR_CLEAN").limit(5).show(truncate=False)

--- üìù Sch√©ma et Aper√ßu apr√®s nettoyage complet des heures ---
root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_DELAY_NEW: string (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- CRS_DEP_HOUR: integer (nullable = true)
 |-- CRS_ARR_HOUR: integer (nullable = true)
 |-- DEP_DELAY_NEW_CLEAN: integer (nullable = true)
 |-- CRS_DEP_TIME_CLEAN: string (n

In [35]:
# Distribution du Retard par Heure de D√©part (Q2)

from pyspark.sql.functions import col, avg, round, desc, count

# Filtration : Retards positifs ET heures valides
df_hourly_delay = df_analysis.filter(
    (col("DEP_DELAY_NEW_CLEAN") > 0) & col("CRS_DEP_HOUR_CLEAN").isNotNull()
).groupBy("CRS_DEP_HOUR_CLEAN").agg(
    round(avg(col("DEP_DELAY_NEW_CLEAN")), 2).alias("RETARD_MOYEN_MINUTES"),
    count(col("DEP_DELAY_NEW_CLEAN")).alias("TOTAL_VOLS_EN_RETARD")
).orderBy(col("CRS_DEP_HOUR_CLEAN").asc()) # Tri par l'heure de la journ√©e (0 √† 23)

print("--- üìä Distribution du Retard Moyen par Heure de D√©part (Cr√©neau 0-23h) ---")
df_hourly_delay.show(24)

--- üìä Distribution du Retard Moyen par Heure de D√©part (Cr√©neau 0-23h) ---
+------------------+--------------------+--------------------+
|CRS_DEP_HOUR_CLEAN|RETARD_MOYEN_MINUTES|TOTAL_VOLS_EN_RETARD|
+------------------+--------------------+--------------------+
|                 1|               21.31|                 142|
|                 2|               29.27|                 294|
|                 3|                34.8|                 480|
|                 4|               32.11|                 354|
|                 5|               51.88|                7120|
|                 6|               44.64|               29060|
|                 7|               42.11|               34473|
|                 8|               35.75|               40340|
|                 9|               31.88|               43940|
|                10|               32.62|               48114|
|                11|               31.71|               51616|
|                12|                33

> Analyse 3 : Distribution des Causes d'Annulation (Q3)

In [34]:
from pyspark.sql.functions import col, count, desc, round, lit, when

# --- √âTAPE DE NETTOYAGE : CR√âATION DE LA COLONNE CANCELLATION_CODE_CLEAN ---
# Cette √©tape est cruciale car la colonne de code d'annulation n'est remplie que si CANCELLED = 1.
# On s'assure que le code est propre avant l'analyse.

df_analysis = df_analysis.withColumn(
    "CANCELLATION_CODE_CLEAN",
    when(col("CANCELLATION_CODE").isNull() | (col("CANCELLATION_CODE") == ""), lit("NOT_APPLICABLE"))
    .otherwise(col("CANCELLATION_CODE"))
)
# Note : √âtant donn√© que nous filtrons sur CANCELLED = 1 ensuite, seuls les codes A, B, C, D seront conserv√©s.

# --- D√âBUT DE L'ANALYSE 3 ---

# Filtrer uniquement les vols annul√©s (CANCELLED = 1)
df_cancellations_only = df_analysis.filter(col("CANCELLED") == 1)

# Calcul du total des annulations pour les calculs de proportions
total_cancellations = df_cancellations_only.count()

# Compter la fr√©quence de chaque code d'annulation (A, B, C, D) et calculer la proportion
df_cause_distribution = df_cancellations_only.groupBy("CANCELLATION_CODE_CLEAN").agg(
    count(col("CANCELLATION_CODE_CLEAN")).alias("NOMBRE_ANNULATIONS")
).withColumn(
    "PROPORTION_POURCENT",
    round((col("NOMBRE_ANNULATIONS") / total_cancellations) * 100, 2)
).orderBy(col("NOMBRE_ANNULATIONS").desc())

print(f"--- üö´ Distribution des Causes d'Annulation (Total: {total_cancellations:,}) ---")
df_cause_distribution.show()


--- üö´ Distribution des Causes d'Annulation (Total: 277,176) ---
+-----------------------+------------------+-------------------+
|CANCELLATION_CODE_CLEAN|NOMBRE_ANNULATIONS|PROPORTION_POURCENT|
+-----------------------+------------------+-------------------+
|                      D|            236777|              85.42|
|                      A|             18291|                6.6|
|                      B|             17979|               6.49|
|                      C|              4129|               1.49|
+-----------------------+------------------+-------------------+

