# Flight data using pyspark
>_Abdelmajid EL HOU

Data source : [Ici](https://raw.githubusercontent.com/AbdelmajidLh/datasets/main/flight_data.csv)

> ## 1. Initialisation de la SparkSession et Chargement des Données

```Python
# Initialisation de la SparkSession (nécessaire si en dehors de Databricks)
from pyspark.sql import sparkSession

# initialiser sparkSession
spark = SparkSession.builder() \
  .appName("Flight Data Analysys") \
  .getOrCreate()
```


In [0]:
# Charger le fichier csv
file_path = "/FileStore/tables/flight_data.csv"
flight_df = spark.read.format("csv") \
  .option("header", "true") \
  .option("separator", ",") \
  .option("inferSchema", "true") \
  .option("treatEmptyValuesAsNulls","true") \
  .option("nullValue", None) \
  .option("emptyValue", None) \
  .load(file_path)

In [0]:
# afficher un apercu 
flight_df.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

In [0]:
# afficher le schema
flight_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Ca

## Étape 2 : Analyse des Données

In [0]:
# nombre de lignes 
print(f"Nombre de lignes : {flight_df.count()}")


Nombre de lignes : 100000


In [0]:
# colonnes
print(f"Colonnes : {flight_df.columns}")

Colonnes : ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


In [0]:
# resume statistique
flight_df.select(flight_df.columns[:5]).describe().show()

+-------+------+------+-----------------+------------------+-----------------+
|summary|  Year| Month|       DayofMonth|         DayOfWeek|          DepTime|
+-------+------+------+-----------------+------------------+-----------------+
|  count|100000|100000|           100000|            100000|           100000|
|   mean|2008.0|   1.0|         17.08786|           3.89004|1355.200722248073|
| stddev|   0.0|   0.0|8.356363976550425|1.9529994727263515|464.0896318178067|
|    min|  2008|     1|                1|                 1|                1|
|    max|  2008|     1|               31|                 7|               NA|
+-------+------+------+-----------------+------------------+-----------------+



In [0]:
# afficher certains colonnes uniquement
flight_df.select("Year", "Month", "DayofMonth", "DayOfWeek").show(10)

+----+-----+----------+---------+
|Year|Month|DayofMonth|DayOfWeek|
+----+-----+----------+---------+
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
+----+-----+----------+---------+
only showing top 10 rows



In [0]:
# selectionner quelques colonnes
flight_df.select(flight_df.columns[:4]).show(10)

+----+-----+----------+---------+
|Year|Month|DayofMonth|DayOfWeek|
+----+-----+----------+---------+
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
|2008|    1|         3|        4|
+----+-----+----------+---------+
only showing top 10 rows



In [0]:
# afficher les lignes où une colonne specifique est nulle
from pyspark.sql.functions import *
flight_df.filter(col("CarrierDelay").isNull()).show(5) # isNull isnan

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------

In [0]:
# afficher les lignes où une colonne contient "NA" en tant que texte
# resultats quelques colonnes
flight_df.filter(col("CarrierDelay") == "NA").select("Year", "Month", "DayofMonth", "DayOfWeek", "CarrierDelay").show(10)

+----+-----+----------+---------+------------+
|Year|Month|DayofMonth|DayOfWeek|CarrierDelay|
+----+-----+----------+---------+------------+
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
|2008|    1|         3|        4|          NA|
+----+-----+----------+---------+------------+
only showing top 10 rows



In [0]:
# remplacer les NA dans la colonne CarrierDelay par des NULL
newDf = flight_df.withColumn('CarrierDelay', regexp_replace('CarrierDelay', 'NA', None))
# remplacer tous les na du df
# flight_df = flight_df.na.replace('NA', None)

In [0]:
# check
newDf.filter(col("CarrierDelay") == "NA").select("Year", "Month", "DayofMonth", "DayOfWeek", "CarrierDelay").show(10)

+----+-----+----------+---------+------------+
|Year|Month|DayofMonth|DayOfWeek|CarrierDelay|
+----+-----+----------+---------+------------+
+----+-----+----------+---------+------------+



In [0]:
# Afficher les vols entre 20 et 31 janvier 2008
newDf.filter((newDf.Year == '2008') & (newDf.Month == '1') & (newDf.DayofMonth >= 20)).select("Year", "Month", "DayofMonth", "DayOfWeek").show(6)

+----+-----+----------+---------+
|Year|Month|DayofMonth|DayOfWeek|
+----+-----+----------+---------+
|2008|    1|        20|        7|
|2008|    1|        20|        7|
|2008|    1|        20|        7|
|2008|    1|        20|        7|
|2008|    1|        20|        7|
|2008|    1|        20|        7|
+----+-----+----------+---------+
only showing top 6 rows



In [0]:
# filtrer les vols valides (non annulés et non détournés) : "Cancelled" = 0 et Diverted = 0
valid_flight = newDf.filter((col("Cancelled") == 0)  & (col("Diverted") == 0  ))
valid_flight.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

In [0]:
# check
valid_flight.select(col("Diverted")).distinct().show()

+--------+
|Diverted|
+--------+
|       0|
+--------+



In [0]:
# ajouter une colonne TotalDelay
flight_df = newDf.withColumn("TotalDelay", col("ArrDelay") + col("DepDelay"))


In [0]:
# ajouter une colonne pour identifier les vols long-distance (> 1000)
flight_df = flight_df.withColumn("LongHaul", when(col("Distance") > 1000, True).otherwise(False))

In [0]:
# trier les vols valides par retard total decroissant
flight_df.orderBy(col("TotalDelay").desc()).select("UniqueCarrier", "TotalDelay").show(10)

+-------------+----------+
|UniqueCarrier|TotalDelay|
+-------------+----------+
|           WN|    1008.0|
|           WN|    1002.0|
|           XE|     896.0|
|           XE|     887.0|
|           WN|     881.0|
|           WN|     846.0|
|           XE|     834.0|
|           XE|     832.0|
|           WN|     793.0|
|           WN|     792.0|
+-------------+----------+
only showing top 10 rows



In [0]:
# autre methode
flight_df.orderBy(col("TotalDelay"), ascending=False).select("UniqueCarrier", "TotalDelay").show(10)

+-------------+----------+
|UniqueCarrier|TotalDelay|
+-------------+----------+
|           WN|    1008.0|
|           WN|    1002.0|
|           XE|     896.0|
|           XE|     887.0|
|           WN|     881.0|
|           WN|     846.0|
|           XE|     834.0|
|           XE|     832.0|
|           WN|     793.0|
|           WN|     792.0|
+-------------+----------+
only showing top 10 rows



In [0]:
flight_df.filter(col("LongHaul") == 'True').select("Year", "DepDelay", "ArrDelay", "TotalDelay", "Distance", "LongHaul").show(5)

+----+--------+--------+----------+--------+--------+
|Year|DepDelay|ArrDelay|TotalDelay|Distance|LongHaul|
+----+--------+--------+----------+--------+--------+
|2008|      67|      57|     124.0|    1591|    true|
|2008|      -1|     -18|     -19.0|    1591|    true|
|2008|      27|      15|      42.0|    1489|    true|
|2008|       9|     -15|      -6.0|    1489|    true|
|2008|      87|      47|     134.0|    1093|    true|
+----+--------+--------+----------+--------+--------+
only showing top 5 rows



In [0]:
print(valid_flight.columns)

['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


## Étape 3 : Agrégations et Analyses

In [0]:
# retard moyen à l'arrivee par compagnie aérienne
retard_moyen_par_compagnie = flight_df.groupBy("UniqueCarrier") \
  .agg(avg("ArrDelay").alias("AvgArrDelay")) \
    .orderBy("AvgArrDelay", ascending = False)
  #.orderBy(col("AvgArrDelay").desc()).show(5)\
retard_moyen_par_compagnie.show(5)
  

+-------------+-----------------+
|UniqueCarrier|      AvgArrDelay|
+-------------+-----------------+
|           WN|5.790533334769635|
|           XE|4.771433441281745|
+-------------+-----------------+



In [0]:
display(retard_moyen_par_compagnie)

UniqueCarrier,AvgArrDelay
WN,5.790533334769635
XE,4.771433441281745


In [0]:
# Nombre de vols par jour
nbVols_jour = flight_df.groupBy("DayofMonth") \
  .agg(count("*").alias("Nbr")) \
    .orderBy("Nbr", ascending = False)
nbVols_jour.show(5)

+----------+----+
|DayofMonth| Nbr|
+----------+----+
|        11|3639|
|        10|3636|
|         8|3635|
|         7|3635|
|         9|3632|
+----------+----+
only showing top 5 rows



In [0]:
display(nbVols_jour)

DayofMonth,Nbr
11,3639
10,3636
8,3635
7,3635
9,3632
29,3632
25,3630
24,3630
31,3630
17,3629


Databricks visualization. Run in Databricks to view.

In [0]:
# statistiques sur les distances par aéroport d'origine
df_stats = flight_df.groupBy("Origin") \
  .agg(min("Distance").alias("MinDistance"),
       max("Distance").alias("MaxDistance"),
       avg("Distance").alias("AvgDistance")
       ).orderBy("AvgDistance", ascending = False) \

stats.show(10)

+------+-----------+-----------+-----------------+
|Origin|MinDistance|MaxDistance|      AvgDistance|
+------+-----------+-----------+-----------------+
|   CRW|        975|        975|            975.0|
|   DAY|        929|        929|            929.0|
|   CLT|        913|        913|            913.0|
|   MCO|        133|       2039| 848.926586102719|
|   MDW|        162|       1855|847.9305116866709|
|   ISP|        220|       2283| 838.712858926342|
|   COS|        789|        911|837.7182320441989|
|   SEA|        224|       1977|820.7069625095639|
|   OMA|        342|       1313|820.3773913043478|
|   IAD|        577|       2066|795.5351170568562|
+------+-----------+-----------+-----------------+
only showing top 10 rows



In [0]:
# visualiser les resultats
display(df_stats)

Origin,MinDistance,MaxDistance,AvgDistance
CRW,975,975,975.0
DAY,929,929,929.0
CLT,913,913,913.0
MCO,133,2039,848.926586102719
MDW,162,1855,847.9305116866709
ISP,220,2283,838.712858926342
COS,789,911,837.7182320441989
SEA,224,1977,820.7069625095639
OMA,342,1313,820.3773913043478
IAD,577,2066,795.5351170568562


## Exporter les résultats

Le format Parquet est un format de stockage colonnaire, compressé et optimisé pour les systèmes Big Data, permettant des analyses rapides et efficaces tout en réduisant la taille des fichiers.

In [0]:
output_parquet = "/FileStore/tables/df_stats"
df_stats.write \
    .format("parquet") \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .partitionBy("Origin") \
    .save(output_parquet)
print(f"Les données sauvegardées à : {output_parquet}")

Les données sauvegardées à : /FileStore/tables/df_stats


In [0]:
# Lister les fichiers dans le dossier
display(dbutils.fs.ls("/FileStore/tables/df_stats/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/df_stats/Origin=ABQ/,Origin=ABQ/,0,0
dbfs:/FileStore/tables/df_stats/Origin=ALB/,Origin=ALB/,0,0
dbfs:/FileStore/tables/df_stats/Origin=AMA/,Origin=AMA/,0,0
dbfs:/FileStore/tables/df_stats/Origin=AUS/,Origin=AUS/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BDL/,Origin=BDL/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BFL/,Origin=BFL/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BHM/,Origin=BHM/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BNA/,Origin=BNA/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BOI/,Origin=BOI/,0,0
dbfs:/FileStore/tables/df_stats/Origin=BTR/,Origin=BTR/,0,0


In [0]:
df_pq = spark.read.format("parquet")\
  .load(output_parquet)
df_pq.show()

+-----------+-----------+------------------+------+
|MinDistance|MaxDistance|       AvgDistance|Origin|
+-----------+-----------+------------------+------+
|        975|        975|             975.0|   CRW|
|        174|       1984| 773.6413908671973|   TPA|
|        226|        404|341.32986767485824|   SNA|
|        337|       1855| 569.1251682368775|   SFO|
|        328|        328|             328.0|   MKE|
|        438|       1195| 776.1111111111111|   IAH|
|        287|        749|436.58851674641147|   BOI|
|        913|        913|             913.0|   CLT|
|        277|        758| 364.0820189274448|   AMA|
|        187|        187|             187.0|   CRP|
|        577|       2066| 795.5351170568562|   IAD|
|        256|       2279| 726.9035560344828|   PHX|
|        180|       1959| 550.3550265890099|   OAK|
|        359|        888|           572.712|   JAN|
|        255|       2027| 622.8699910952805|   RDU|
|        220|       2283|  838.712858926342|   ISP|
|        111