Developpeur: Mouhite ADEBO

***version: français***
#### Sujet
Créer un pipeline ***ETL (Extract, Transform, Load)*** permettant de traiter les données de ***l'[API aviationstack](https://aviationstack.com/)***, qui répertorie l'ensemble des ***vols aériens, aéroports, compagnies aériennes mondiales***.

C'est une API est payant, j'ai utilisant la période d'essai pour réaliser le projet. Pour la version à laquel j'ai souscrire, je ne peux que récupérer 100 données et faire 10000 requêtes par mois.

#### Tâches
- Extraction de données avec la librairie python "requests"
- Transformation de données:
  - Data preparation : 
    1. Créer un cadre de données pour chaque extrait d'ensemble de données
    2. Explosez mon dataframe df_flights: [Explication: parce que pour certains de mes colonnes j'avais des sous structure de données]
    3. Affichage de la taille des données et du premier élement : [Explication: C'est pour connaitre la taille des données extraites et avoir une vue de ceux à quoi elle ressemble]
    4. Néttoyage des données
  
        4.1. Convertir le type de chaque colonne [Explication: Leur type etait par défaut, mais certains données nécessitait d'avoir leur vrai type comme les dates]

        4.2. Remplir les valeur NaN [Explication: Pour pouvoir mieux faire mon traitement de données]

        4.3. Séparer les colonnes departure_timezone et arrival_timezone du continent et de la ville. [Explication: Il serait plus simple d'avoir les deux éléments séparer , car les résultats seront différents une fois séparer par rapport à lorsqu'il était ensemble]

        4.4. Calculer la durée en secondes et en heures des vols [Explication: Cela pourrai nous servir lors de notre traitement ou plus tard les data analyst par exemple]
  - Requêtage : 
    1. La compagnie avec le + de vols en cours
    2. Pour chaque continent, la compagnie avec le + de vols régionaux actifs (continent d'origine == continent de destination)
    3. Le vol en cours avec le trajet le plus long
    4. Pour chaque continent, la longueur de vol moyenne
    5. L'entreprise constructeur d'avions avec le plus de vols actifs
    6. Pour chaque pays de compagnie aérienne, le top 3 des modèles d'avion en usage
- Enregistrer la données extraites

***NB: J'ai commenté tout mon code, afin que le lecteur puisse comprendre ce que j'ai fait***

> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

***version anglaise***
#### Topic
Create a ***ETL (Extract, Transform, Load)*** pipeline to process data from ***the [aviationstack API](https://aviationstack.com/)***, which lists all ***air flights, airports, airlines worldwide***.

It is an API is paying, I used the trial period to realize the project. For the version I subscribed to, I can only retrieve 100 data and make 10000 queries per month.

#### Tasks
- Data extraction with the python library "requests
- Data transformation:
  - Data preparation : 
    1. Create a data frame for each data set extract
    2. Explain my dataframe df_flights: [Explanation: because for some of my columns I had sub data structures]
    3. Display data size and first element: [Explanation: This is to know the size of the extracted data and have a view of what it looks like]
    4. Data cleanup
  
        4.1. Convert the type of each column [Explanation: Their type was default, but some data needed to have their real type like dates]

        4.2 Fill in the NaN values [Explanation: To be able to do my data processing better]

        4.3. Separate the departure_timezone and arrival_timezone columns from the continent and city. [Explanation: It would be easier to have the two elements separated, because the results will be different when separated than when they were together].

        4.4 Calculate the duration in seconds and hours of the flights [Explanation: This could be useful for our processing or later the data analysts for example].
  - Query : 
    1. The company with the most flights in progress
    2. For each continent, the airline with the most active regional flights (continent of origin == continent of destination)
    3. The current flight with the longest route
    4. For each continent, the average flight length
    5. The aircraft manufacturer with the most active flights
    6. For each airline country, the top 3 aircraft models in use
- Save the extracted data

***NB: I commented all my code, so the reader can understand what I did***

In [0]:
# config to get data
params = {
  'access_key': 'dd9e01818832f20af35d64846177ecc9'
}

In [0]:
# list endpoints, allow to get flights, airports and airlines
url_flights = "http://api.aviationstack.com/v1/flights"

## Extraction data

In [0]:
import requests

In [0]:
# function to do extraction data on my API
def get_data(url, type_data):    
    try:
        response = requests.get(url, params=params)
        return response.json()['data']
    except Exception as e:
        print(f'Error: I can not get data to {type_data}. See error :  [{e}]')
        return []

In [0]:
# Display not clean data
data_flights = get_data(url_flights, "flights")

In [0]:
# Function allow to display of the data size and the first element
def describe_data_extract(data, type_data):
    if len(data) > 0:
        print(f"""
        [\n
            'Type data': {type_data};\n
            'Size':{len(data)};\n
            'First_element': {data[0]}
        \n]
        """)
    else:
        print(f"[\n'Type data': {type_data};\n'Size':{len(data)};\n'First_element': {data}\n]\n")

# Function allow to describe data extracted
describe_data_extract(data_flights, "flights")


        [

            'Type data': flights;

            'Size':100;

            'First_element': {'flight_date': '2023-05-19', 'flight_status': 'scheduled', 'departure': {'airport': 'Shashi', 'timezone': 'Asia/Shanghai', 'iata': 'SHS', 'icao': 'ZHJZ', 'terminal': None, 'gate': None, 'delay': 13, 'scheduled': '2023-05-19T13:40:00+00:00', 'estimated': '2023-05-19T13:40:00+00:00', 'actual': None, 'estimated_runway': None, 'actual_runway': None}, 'arrival': {'airport': 'Jinan', 'timezone': 'Asia/Shanghai', 'iata': 'TNA', 'icao': 'ZSJN', 'terminal': 'T1', 'gate': None, 'baggage': None, 'delay': None, 'scheduled': '2023-05-19T15:45:00+00:00', 'estimated': '2023-05-19T15:45:00+00:00', 'actual': None, 'estimated_runway': None, 'actual_runway': None}, 'airline': {'name': 'Shenzhen Airlines', 'iata': 'ZH', 'icao': 'CSZ'}, 'flight': {'number': '5439', 'iata': 'ZH5439', 'icao': 'CSZ5439', 'codeshared': {'airline_name': 'kunming airlines', 'airline_iata': 'ky', 'airline_icao': 'kna', 'flight_nu

## Data Transformation

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row

In [0]:
# Create spark session
spark = SparkSession.builder \
    .appName("FlightRadarETL_Test_Technique") \
    .getOrCreate()

spark

**1. Create dataframe to every dataset extract**

In [0]:
# Step1: define schema dataframe
schema_flights = StructType([
    StructField("flight_date", StringType(), True),
    StructField("flight_status", StringType(), True),
    StructField("departure", StructType([
        StructField("airport", StringType(), True),
        StructField("timezone", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("icao", StringType(), True),
        StructField("terminal", StringType(), True),
        StructField("gate", StringType(), True),
        StructField("delay", StringType(), True),
        StructField("scheduled", StringType(), True),
        StructField("estimated", StringType(), True),
        StructField("actual", StringType(), True),
        StructField("estimated_runway", StringType(), True),
        StructField("actual_runway", StringType(), True),
    ]), True),
    StructField("arrival", StructType([
        StructField("airport", StringType(), True),
        StructField("timezone", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("icao", StringType(), True),
        StructField("terminal", StringType(), True),
        StructField("gate", StringType(), True),
        StructField("baggage", StringType(), True),
        StructField("delay", StringType(), True),
        StructField("scheduled", StringType(), True),
        StructField("estimated", StringType(), True),
        StructField("actual", StringType(), True),
        StructField("estimated_runway", StringType(), True),
        StructField("actual_runway", StringType(), True),
    ]), True),
    StructField("airline", StructType([
        StructField("name", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("icao", StringType(), True),
    ]), True),
    StructField("flight", StructType([
        StructField("number", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("icao", StringType(), True),
        StructField("codeshared", StringType(), True),
    ]), True),
    StructField("aircraft", StructType([
        StructField("registration", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("icao", StringType(), True),
        StructField("icao24", StringType(), True),
    ]), True),
    StructField("live", StructType([
        StructField("updated", StringType(), True),
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("altitude", StringType(), True),
        StructField("direction", StringType(), True),
        StructField("speed_horizontal", StringType(), True),
        StructField("speed_vertical", StringType(), True),
        StructField("is_ground", StringType(), True),        
    ]), True)
])

In [0]:
# Step2: create rdd and create my dataframe
rdd_flights = spark.sparkContext.parallelize(data_flights)
df_flights = spark.createDataFrame(rdd_flights, schema_flights)
df_flights.toPandas().head()

Unnamed: 0,flight_date,flight_status,departure,arrival,airline,flight,aircraft,live
0,2023-05-19,scheduled,"{'airport': 'Shashi', 'timezone': 'Asia/Shangh...","{'airport': 'Jinan', 'timezone': 'Asia/Shangha...","{'name': 'Shenzhen Airlines', 'iata': 'ZH', 'i...","{'number': '5439', 'iata': 'ZH5439', 'icao': '...",,
1,2023-05-19,scheduled,"{'airport': 'Shashi', 'timezone': 'Asia/Shangh...","{'airport': 'Jinan', 'timezone': 'Asia/Shangha...","{'name': 'Air China LTD', 'iata': 'CA', 'icao'...","{'number': '5719', 'iata': 'CA5719', 'icao': '...",,
2,2023-05-19,scheduled,"{'airport': 'Shashi', 'timezone': 'Asia/Shangh...","{'airport': 'Jinan', 'timezone': 'Asia/Shangha...","{'name': 'China Express Air', 'iata': 'G5', 'i...","{'number': '8827', 'iata': 'G58827', 'icao': '...",,
3,2023-05-19,scheduled,"{'airport': 'Jinan', 'timezone': 'Asia/Shangha...","{'airport': 'Xiamen', 'timezone': 'Asia/Shangh...","{'name': 'Tibet Airlines', 'iata': 'TV', 'icao...","{'number': '7139', 'iata': 'TV7139', 'icao': '...",,
4,2023-05-19,scheduled,"{'airport': 'Kunming', 'timezone': 'Asia/Shang...","{'airport': 'Cangyuan Washan Airport ', 'time...","{'name': 'China Eastern Airlines', 'iata': 'MU...","{'number': '5919', 'iata': 'MU5919', 'icao': '...",,


**2. Explode my dataframe df_flights**

In [0]:
# create alias for my sub structure 
df_flights = df_flights.select(
    col("flight_date"),
    col("flight_status"),
    
    col("departure.airport").alias("departure_airport"),
    col("departure.timezone").alias("departure_timezone"),
    col("departure.iata").alias("departure_iata"),
    col("departure.icao").alias("departure_icao"),
    col("departure.terminal").alias("departure_terminal"),
    col("departure.gate").alias("departure_gate"),
    col("departure.delay").alias("departure_delay"),
    col("departure.scheduled").alias("departure_scheduled"),
    col("departure.estimated").alias("departure_estimated"),
    col("departure.actual").alias("departure_actual"),
    col("departure.estimated_runway").alias("departure_estimated_runway"),
    col("departure.actual_runway").alias("departure_actual_runway"),

    col("arrival.airport").alias("arrival_airport"),
    col("arrival.timezone").alias("arrival_timezone"),
    col("arrival.iata").alias("arrival_iata"),
    col("arrival.icao").alias("arrival_icao"),
    col("arrival.terminal").alias("arrival_terminal"),
    col("arrival.gate").alias("arrival_gate"),
    col("arrival.baggage").alias("arrival_baggage"),
    col("arrival.delay").alias("arrival_delay"),
    col("arrival.scheduled").alias("arrival_scheduled"),
    col("arrival.estimated").alias("arrival_estimated"),
    col("arrival.actual").alias("arrival_actual"),
    col("arrival.estimated_runway").alias("arrival_estimated_runway"),
    col("arrival.actual_runway").alias("arrival_actual_runway"),

    col("airline.name").alias("airline_name"),
    col("airline.iata").alias("airline_iata"),
    col("airline.icao").alias("airline_icao"),

    col("flight.number").alias("flight_number"),
    col("flight.iata").alias("flight_iata"),
    col("flight.icao").alias("flight_icao"),
    col("flight.codeshared").alias("flight_codeshared"),
    
    col("aircraft.registration").alias("aircraft_registration"),
    col("aircraft.iata").alias("aircraft_iata"),
    col("aircraft.icao").alias("aircraft_icao"),
    col("aircraft.icao24").alias("aircraft_icao24"),

    col("live.updated").alias("live_updated"),
    col("live.latitude").alias("live_latitude"),
    col("live.longitude").alias("live_longitude"),
    col("live.altitude").alias("live_altitude"),
    col("live.direction").alias("live_direction"),
    col("live.speed_horizontal").alias("live_speed_horizontal"),
    col("live.speed_vertical").alias("live_speed_vertical"),
    col("live.is_ground").alias("live_is_ground"),
)
# display new dataset
df_flights.toPandas().head(100)

Unnamed: 0,flight_date,flight_status,departure_airport,departure_timezone,departure_iata,departure_icao,departure_terminal,departure_gate,departure_delay,departure_scheduled,...,aircraft_icao,aircraft_icao24,live_updated,live_latitude,live_longitude,live_altitude,live_direction,live_speed_horizontal,live_speed_vertical,live_is_ground
0,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,,,13,2023-05-19T13:40:00+00:00,...,,,,,,,,,,
1,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,,,13,2023-05-19T13:40:00+00:00,...,,,,,,,,,,
2,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,,,13,2023-05-19T13:40:00+00:00,...,,,,,,,,,,
3,2023-05-19,scheduled,Jinan,Asia/Shanghai,TNA,ZSJN,,,70,2023-05-19T15:30:00+00:00,...,,,,,,,,,,
4,2023-05-19,scheduled,Kunming,Asia/Shanghai,KMG,ZPPP,,F06-,60,2023-05-19T15:55:00+00:00,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,,LHW,ZLLL,T2,,,2023-05-19T20:00:00+00:00,...,,,,,,,,,,
96,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,,LHW,ZLLL,T2,,,2023-05-19T19:55:00+00:00,...,,,,,,,,,,
97,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,,LHW,ZLLL,T2,108,,2023-05-19T19:55:00+00:00,...,,,,,,,,,,
98,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,,LHW,ZLLL,T2,108,,2023-05-19T19:55:00+00:00,...,,,,,,,,,,


**3. Describe my dataframe df_flights**

In [0]:
def describe_dataframe(df):
    print("Display my dataframe : ")
    df.show()
    print()
    
    print(f"List column of dataframe flights is : {df_flights.columns}\n")
    
    print('Describe of my dataframe df_flights : ')
    df_flights.describe().show()
    print()
    
    print('Count number of miss value in my dataframe : ')
    df_flights.select([count(when(col(c).isNull(), c)).alias(c) for c in df_flights.columns]).show()

describe_dataframe(df_flights)

Display my dataframe : 
+-----------+-------------+--------------------+------------------+--------------+--------------+------------------+--------------+---------------+--------------------+--------------------+----------------+--------------------------+-----------------------+--------------------+----------------+------------+------------+----------------+------------+---------------+-------------+--------------------+--------------------+--------------+------------------------+---------------------+--------------------+------------+------------+-------------+-----------+-----------+--------------------+---------------------+-------------+-------------+---------------+------------+-------------+--------------+-------------+--------------+---------------------+-------------------+--------------+
|flight_date|flight_status|   departure_airport|departure_timezone|departure_iata|departure_icao|departure_terminal|departure_gate|departure_delay| departure_scheduled| departure_estimated|d

**4. Cleaning dataframe df_flights**

4.1. Convert type every column

In [0]:
df_flights = df_flights.withColumn("flight_date", col("flight_date").cast("date"))

df_flights = df_flights.withColumn("flight_status",
                                    when(col('flight_status').isNull() | isnan(col('flight_status')), None) \
                                    .otherwise(col("flight_status").cast("string")))

df_flights = df_flights.withColumn("departure_airport",
                                    when(col('departure_airport').isNull() | isnan(col('departure_airport')), None) \
                                    .otherwise(col("departure_airport").cast("string")))
df_flights = df_flights.withColumn("departure_timezone",
                                    when(col('departure_timezone').isNull() | isnan(col('departure_timezone')), None) \
                                    .otherwise(col("departure_timezone").cast("string")))
df_flights = df_flights.withColumn("departure_iata",
                                    when(col('departure_iata').isNull() | isnan(col('departure_iata')), None) \
                                    .otherwise(col("departure_iata").cast("string")))
df_flights = df_flights.withColumn("departure_icao",
                                    when(col('departure_icao').isNull() | isnan(col('departure_icao')), None) \
                                    .otherwise(col("departure_icao").cast("string")))
df_flights = df_flights.withColumn("departure_terminal",
                                    when(col('departure_terminal').isNull() | isnan(col('departure_terminal')), None) \
                                    .otherwise(col("departure_terminal").cast("string")))
df_flights = df_flights.withColumn("departure_gate",
                                    when(col('departure_gate').isNull() | isnan(col('departure_gate')), None) \
                                    .otherwise(col("departure_gate").cast("string")))
df_flights = df_flights.withColumn("departure_delay",
                                    when(col('departure_delay').isNull() | isnan(col('departure_delay')), None) \
                                    .otherwise(col("departure_delay").cast("string")))
df_flights = df_flights.withColumn("departure_scheduled", to_timestamp(col("departure_scheduled")))
df_flights = df_flights.withColumn("departure_estimated", to_timestamp(col("departure_estimated")))
df_flights = df_flights.withColumn("departure_actual", to_timestamp(col("departure_actual")))
df_flights = df_flights.withColumn("departure_estimated_runway", to_timestamp(col("departure_estimated_runway")))
df_flights = df_flights.withColumn("departure_actual_runway", to_timestamp(col("departure_actual_runway")))

df_flights = df_flights.withColumn("arrival_airport",
                                    when(col('arrival_airport').isNull() | isnan(col('arrival_airport')), None) \
                                    .otherwise(col("arrival_airport").cast("string")))
df_flights = df_flights.withColumn("arrival_timezone",
                                    when(col('arrival_timezone').isNull() | isnan(col('arrival_timezone')), None) \
                                    .otherwise(col("arrival_timezone").cast("string")))
df_flights = df_flights.withColumn("arrival_iata",
                                    when(col('arrival_iata').isNull() | isnan(col('arrival_iata')), None) \
                                    .otherwise(col("arrival_iata").cast("string")))
df_flights = df_flights.withColumn("arrival_icao",
                                    when(col('arrival_icao').isNull() | isnan(col('arrival_icao')), None) \
                                    .otherwise(col("arrival_icao").cast("string")))
df_flights = df_flights.withColumn("arrival_terminal",
                                    when(col('arrival_terminal').isNull() | isnan(col('arrival_terminal')), None) \
                                    .otherwise(col("arrival_terminal").cast("string")))
df_flights = df_flights.withColumn("arrival_gate",
                                    when(col('arrival_gate').isNull() | isnan(col('arrival_gate')), None) \
                                    .otherwise(col("arrival_gate").cast("string")))
df_flights = df_flights.withColumn("arrival_baggage",
                                    when(col('arrival_baggage').isNull() | isnan(col('arrival_baggage')), None) \
                                    .otherwise(col("arrival_baggage").cast("string")))
df_flights = df_flights.withColumn("arrival_delay",
                                    when(col('arrival_delay').isNull() | isnan(col('arrival_delay')), None) \
                                    .otherwise(col("arrival_delay").cast("string")))
df_flights = df_flights.withColumn("arrival_scheduled", to_timestamp(col("arrival_scheduled")))
df_flights = df_flights.withColumn("arrival_estimated", to_timestamp(col("arrival_estimated")))
df_flights = df_flights.withColumn("arrival_actual", to_timestamp(col("arrival_actual")))
df_flights = df_flights.withColumn("arrival_estimated_runway", to_timestamp(col("arrival_estimated_runway")))
df_flights = df_flights.withColumn("arrival_actual_runway", to_timestamp(col("arrival_actual_runway")))

df_flights = df_flights.withColumn("airline_name",
                                    when(col('airline_name').isNull() | isnan(col('airline_name')), None) \
                                    .otherwise(col("airline_name").cast("string")))
df_flights = df_flights.withColumn("airline_iata",
                                    when(col('airline_iata').isNull() | isnan(col('airline_iata')), None) \
                                    .otherwise(col("airline_iata").cast("string")))
df_flights = df_flights.withColumn("airline_icao",
                                    when(col('airline_icao').isNull() | isnan(col('airline_icao')), None) \
                                    .otherwise(col("airline_icao").cast("string")))

df_flights = df_flights.withColumn("flight_number",
                                    when(col('flight_number').isNull() | isnan(col('flight_number')), None) \
                                    .otherwise(col("flight_number").cast("string")))
df_flights = df_flights.withColumn("flight_iata",
                                    when(col('flight_iata').isNull() | isnan(col('flight_iata')), None) \
                                    .otherwise(col("flight_iata").cast("string")))
df_flights = df_flights.withColumn("flight_icao",
                                    when(col('flight_icao').isNull() | isnan(col('flight_icao')), None) \
                                    .otherwise(col("flight_icao").cast("string")))
df_flights = df_flights.withColumn("flight_codeshared",
                                    when(col('flight_codeshared').isNull() | isnan(col('flight_codeshared')), None) \
                                    .otherwise(col("flight_codeshared").cast("string")))

df_flights = df_flights.withColumn("aircraft_registration",
                                    when(col('aircraft_registration').isNull() | isnan(col('aircraft_registration')), None) \
                                    .otherwise(col("aircraft_registration").cast("string")))
df_flights = df_flights.withColumn("aircraft_iata",
                                    when(col('aircraft_iata').isNull() | isnan(col('aircraft_iata')), None) \
                                    .otherwise(col("aircraft_iata").cast("string")))
df_flights = df_flights.withColumn("aircraft_icao",
                                    when(col('aircraft_icao').isNull() | isnan(col('aircraft_icao')), None) \
                                    .otherwise(col("aircraft_icao").cast("string")))

df_flights = df_flights.withColumn("live_updated", to_timestamp(col("live_updated")))
df_flights = df_flights.withColumn("live_latitude",
                                    when(col('live_latitude').isNull() | isnan(col('live_latitude')), None) \
                                    .otherwise(col("live_latitude").cast("string")))
df_flights = df_flights.withColumn("live_longitude",
                                    when(col('live_longitude').isNull() | isnan(col('live_longitude')), None) \
                                    .otherwise(col("live_longitude").cast("string")))
df_flights = df_flights.withColumn("live_altitude",
                                    when(col('live_altitude').isNull() | isnan(col('live_altitude')), None) \
                                    .otherwise(col("live_altitude").cast("string")))
df_flights = df_flights.withColumn("live_direction",
                                    when(col('live_direction').isNull() | isnan(col('live_direction')), None) \
                                    .otherwise(col("live_direction").cast("string")))
df_flights = df_flights.withColumn("live_speed_horizontal",
                                    when(col('live_speed_horizontal').isNull() | isnan(col('live_speed_horizontal')), None) \
                                    .otherwise(col("live_speed_horizontal").cast("string")))
df_flights = df_flights.withColumn("live_speed_vertical",
                                    when(col('live_speed_vertical').isNull() | isnan(col('live_speed_vertical')), None) \
                                    .otherwise(col("live_speed_vertical").cast("string")))
df_flights = df_flights.withColumn("live_is_ground",
                                    when(col('live_is_ground').isNull() | isnan(col('live_is_ground')), None) \
                                    .otherwise(col("live_is_ground").cast("string")))

# Print the schema to verify the changes
df_flights.printSchema()


root
 |-- flight_date: date (nullable = true)
 |-- flight_status: string (nullable = true)
 |-- departure_airport: string (nullable = true)
 |-- departure_timezone: string (nullable = true)
 |-- departure_iata: string (nullable = true)
 |-- departure_icao: string (nullable = true)
 |-- departure_terminal: string (nullable = true)
 |-- departure_gate: string (nullable = true)
 |-- departure_delay: string (nullable = true)
 |-- departure_scheduled: timestamp (nullable = true)
 |-- departure_estimated: timestamp (nullable = true)
 |-- departure_actual: timestamp (nullable = true)
 |-- departure_estimated_runway: timestamp (nullable = true)
 |-- departure_actual_runway: timestamp (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- arrival_timezone: string (nullable = true)
 |-- arrival_iata: string (nullable = true)
 |-- arrival_icao: string (nullable = true)
 |-- arrival_terminal: string (nullable = true)
 |-- arrival_gate: string (nullable = true)
 |-- arrival_baggage: 

4.2. Fill NaN values

In [0]:
# Replace all NaN value by this string "No data <name_column>"
def fill_nan_value(df, fill_value="No data"):
    for column in df.columns:
        try:                
            if df.filter(col(column).isNull() | isnan(col(column))).count() > 0:
    #           df = df.withColumn(column, col(column).fillna(fill_value + " " + column))
                df = df.fillna(fill_value + " " + column, subset=[column])
        except Exception as e:
            print(f'I can not check with function isnan this column {column}')
            df = df.fillna("", subset=[column])

        
    return df

df_flights = fill_nan_value(df_flights)

I can not check with function isnan this column flight_date
I can not check with function isnan this column departure_scheduled
I can not check with function isnan this column departure_estimated
I can not check with function isnan this column departure_actual
I can not check with function isnan this column departure_estimated_runway
I can not check with function isnan this column departure_actual_runway
I can not check with function isnan this column arrival_scheduled
I can not check with function isnan this column arrival_estimated
I can not check with function isnan this column arrival_actual
I can not check with function isnan this column arrival_estimated_runway
I can not check with function isnan this column arrival_actual_runway
I can not check with function isnan this column live_updated


4.3. Seperate column departure_timezone and arrival_timezone to continent and city

In [0]:
def seperate_timezone(df):
#   retrieve in a list departure_timezone and arrival_timezone
    departure_timezone = df['departure_timezone']
    arrival_timezone = df['arrival_timezone']
    
#     split every column in two column (departure, arrival)_continent and (departure, arrival)_city
    df = df.withColumn("departure_continent", split(df["departure_timezone"], "/").getItem(0)) \
           .withColumn("departure_city", split(df["departure_timezone"], "/").getItem(1)) \
           .withColumn("arrival_continent", split(df["arrival_timezone"], "/").getItem(0)) \
           .withColumn("arrival_city", split(df["arrival_timezone"], "/").getItem(1))
    
#     treat eventual values NaN. If see Nan replace by "No data departure_continent" for data related to continent and "No data departure_city" for data related city
    df = df.withColumn("departure_continent", 
                       when(df["departure_continent"].isNull(), "No data departure_continent") \
                       .otherwise(df["departure_continent"])) \
          .withColumn("departure_city", 
                      when(df["departure_city"].isNull(), "No data departure_city") \
                      .otherwise(df["departure_city"])) \
          .withColumn("arrival_continent", when(df["arrival_continent"].isNull(), "No data arrival_continent") \
                      .otherwise(df["arrival_continent"])) \
          .withColumn("arrival_city", when(df["arrival_city"].isNull(), "No data arrival_city") \
                      .otherwise(df["arrival_city"]))
    return df
    
df_flights = seperate_timezone(df_flights)    

4.4. Calculate duration in second and hour of flights

In [0]:
def calculate_duration_in_sec_and_hour(df):
    # Calculate flight duration
    df = df.withColumn('duration', 
                       when(col('arrival_scheduled').isNull(), None) \
                       .otherwise(col('arrival_scheduled').cast('long') - col('departure_scheduled').cast('long')))
    # Convert mean duration to every continent to format HH:mm:ss
    df = df.withColumn("duration_in_hour", from_unixtime(col("duration"), "HH:mm:ss"))
    
    return df

df_flights = calculate_duration_in_sec_and_hour(df_flights)

In [0]:
# display dataframe after cleaning
describe_dataframe(df_flights)

Display my dataframe : 
+-----------+-------------+--------------------+------------------+--------------+--------------+--------------------+--------------------+--------------------+-------------------+-------------------+----------------+--------------------------+-----------------------+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------+------------------------+---------------------+--------------------+------------+------------+-------------+-----------+-----------+--------------------+---------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+-------------------+--------------+--------------------+--------------------+--------+----------------

In [0]:
df_flights.toPandas()

Unnamed: 0,flight_date,flight_status,departure_airport,departure_timezone,departure_iata,departure_icao,departure_terminal,departure_gate,departure_delay,departure_scheduled,...,live_direction,live_speed_horizontal,live_speed_vertical,live_is_ground,departure_continent,departure_city,arrival_continent,arrival_city,duration,duration_in_hour
0,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,No data departure_terminal,No data departure_gate,13,2023-05-19 13:40:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,Asia,Shanghai,Asia,Shanghai,7500,02:05:00
1,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,No data departure_terminal,No data departure_gate,13,2023-05-19 13:40:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,Asia,Shanghai,Asia,Shanghai,7500,02:05:00
2,2023-05-19,scheduled,Shashi,Asia/Shanghai,SHS,ZHJZ,No data departure_terminal,No data departure_gate,13,2023-05-19 13:40:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,Asia,Shanghai,Asia,Shanghai,7500,02:05:00
3,2023-05-19,scheduled,Jinan,Asia/Shanghai,TNA,ZSJN,No data departure_terminal,No data departure_gate,70,2023-05-19 15:30:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,Asia,Shanghai,Asia,Shanghai,7800,02:10:00
4,2023-05-19,scheduled,Kunming,Asia/Shanghai,KMG,ZPPP,No data departure_terminal,F06-,60,2023-05-19 15:55:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,Asia,Shanghai,No data arrival_timezone,No data arrival_city,4800,01:20:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,No data departure_timezone,LHW,ZLLL,T2,No data departure_gate,No data departure_delay,2023-05-19 20:00:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,No data departure_timezone,No data departure_city,Asia,Shanghai,10800,03:00:00
96,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,No data departure_timezone,LHW,ZLLL,T2,No data departure_gate,No data departure_delay,2023-05-19 19:55:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,No data departure_timezone,No data departure_city,Asia,Shanghai,7200,02:00:00
97,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,No data departure_timezone,LHW,ZLLL,T2,108,No data departure_delay,2023-05-19 19:55:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,No data departure_timezone,No data departure_city,Asia,Shanghai,7800,02:10:00
98,2023-05-19,scheduled,Lanzhou Zhongchuan Airport,No data departure_timezone,LHW,ZLLL,T2,108,No data departure_delay,2023-05-19 19:55:00,...,No data live_direction,No data live_speed_horizontal,No data live_speed_vertical,No data live_is_ground,No data departure_timezone,No data departure_city,Asia,Shanghai,7800,02:10:00


## Requests on my dataframe

**Q1: La compagnie avec le plus de vols en cours**

In [0]:
number_flights_by_airline = df_flights.groupBy('airline_name').count().orderBy(desc("count"))
print(f"Airline have more flights is {number_flights_by_airline.first()}.")
number_flights_by_airline.toPandas()

Airline have more flights is Row(airline_name='Philippine Airlines', count=8).


Unnamed: 0,airline_name,count
0,Philippine Airlines,8
1,Singapore Airlines,8
2,China Eastern Airlines,4
3,Cebu Pacific Air,4
4,Vietnam Airlines,4
5,Air New Zealand,4
6,Shenzhen Airlines,3
7,Qatar Airways,3
8,AirAsia,3
9,Malaysia Airlines,3


**Q2: Pour chaque continent, la compagnie avec le + de vols régionaux actifs (continent d'origine == continent de destination)**

In [0]:
# version 1
# Filter the data based on the condition ask
regional_flights = df_flights.filter(col("departure_continent") == col("arrival_continent"))

# Group the data by "departure_continent" and "airline_name" to see how many regional flights to every airline 
regional_flights_count = regional_flights.groupBy("departure_continent", "airline_name") \
    .agg(count("*").alias("flight_count"))

# Group the data by "departure_continent" and find the maximum flight count by departure_continent 
max_regional_flights_count = regional_flights_count.groupBy("departure_continent") \
    .agg(max("flight_count").alias("max_flight"))

# Join the "max_regional_flights_count" dataFrame with the original "flight_counts" dataFrame
max_regional_flights_by_timezone = max_regional_flights_count.join(regional_flights_count, ["departure_continent"], "inner") \
    .filter(col("flight_count") == col("max_flight"))

# Group data by "departure_continent" and collect values "max_flight" corresponding
max_regional_flights_by_timezone = max_regional_flights_by_timezone.groupBy("departure_continent", "max_flight") \
    .agg(collect_list("airline_name").alias("airline_names"))

max_regional_flights_by_timezone.toPandas().head(100)

Unnamed: 0,departure_continent,max_flight,airline_names
0,Asia,8,[Philippine Airlines]
1,Pacific,2,[Aloha Air Cargo]
2,Europe,1,"[Corendon Air, SAS, Air Austral, Kenya Airways..."
3,America,2,[Qatar Airways]
4,Africa,1,"[Ethiopian Airlines, Afrijet, empty]"


In [0]:
# version 2 : take first continent in alphabetic order
max_regional_flights_by_timezone = max_regional_flights_by_timezone.withColumn("max_airline_name", array_min(col("airline_names")))
max_regional_flights_by_timezone = max_regional_flights_by_timezone.drop("airline_names")
max_regional_flights_by_timezone.toPandas().head(100)

Unnamed: 0,departure_continent,max_flight,max_airline_name
0,Asia,8,Philippine Airlines
1,Pacific,2,Aloha Air Cargo
2,Europe,1,Air Austral
3,America,2,Qatar Airways
4,Africa,1,Afrijet


**Q3: Le vol en cours avec le trajet le plus long**

In [0]:
# Found max duration
max_duration = df_flights.agg(max("duration").alias("max_duration")).select("max_duration").first()[0]

# Applicate filter to display
flights_with_max_duration = df_flights.filter(col('duration') == max_duration).select("duration_in_hour", "flight_number", "arrival_airport", 'departure_airport', "departure_scheduled", "arrival_scheduled")

flights_with_max_duration.toPandas()                                                                                        


Unnamed: 0,duration_in_hour,flight_number,arrival_airport,departure_airport,departure_scheduled,arrival_scheduled
0,05:30:00,568,Shanghai Pudong International,Singapore Changi,2023-05-19 16:35:00,2023-05-19 22:05:00


**Q4: Pour chaque continent, la longueur de vol moyenne**

In [0]:
# Calculate average duration to every continent
average_flight_duration = df_flights.groupBy("departure_continent") \
                                    .avg("duration") \
                                    .withColumnRenamed("avg(duration)", "average_duration_seconds")

# Convert average duration to every continent to format HH:mm:ss
average_flight_duration = average_flight_duration.withColumn("average_duration", from_unixtime(col("average_duration_seconds"), "HH:mm:ss"))

average_flight_duration.toPandas()

Unnamed: 0,departure_continent,average_duration_seconds,average_duration
0,Asia,8295.616438,02:18:15
1,Pacific,2520.0,00:42:00
2,America,6000.0,01:40:00
3,Europe,9309.230769,02:35:09
4,Africa,5200.0,01:26:40
5,No data departure_timezone,8300.0,02:18:20


**Q5: L'aeroport avec le plus de vols actifs**

In [0]:
# get number of flights for departure and arrival airport in order desc and display the first row
number_flights_by_departure_airport = df_flights.groupBy('departure_airport').count().orderBy(desc("count"))
number_flights_by_arrival_airport = df_flights.groupBy('arrival_airport').count().orderBy(desc("count"))

print(f"Departure Airport have more flights is {number_flights_by_departure_airport.first()}.")
print(f"Arrival Airport have more flights is {number_flights_by_arrival_airport.first()}.")

number_flights_by_departure_airport.show(100)
number_flights_by_arrival_airport.show(100)

Departure Airport have more flights is Row(departure_airport='Singapore Changi', count=26).
Arrival Airport have more flights is Row(arrival_airport='Noibai International', count=9).
+--------------------+-----+
|   departure_airport|count|
+--------------------+-----+
|    Singapore Changi|   26|
|Ninoy Aquino Inte...|   19|
|Tan Son Nhat Inte...|    9|
|             Kunming|    8|
|   Charles De Gaulle|    6|
|Lanzhou Zhongchua...|    5|
|             Fukuoka|    4|
|    Istanbul Airport|    4|
|              Shashi|    3|
| Tri-cities Regional|    3|
|Honolulu Internat...|    2|
| Franceville/Mvengue|    2|
|Abu Dhabi Interna...|    2|
|               Jinan|    1|
|Nikos Kazantzakis...|    1|
|            Lalibela|    1|
|No data departure...|    1|
|               Mulu,|    1|
|               Souda|    1|
|            Chabeuil|    1|
+--------------------+-----+

+--------------------+-----+
|     arrival_airport|count|
+--------------------+-----+
|Noibai International|    9|
|Kua

**Q6: Pour chaque pays de compagnie aérienne, le top 3 des modèles d'avion en usage**

In [0]:
# Group by 'departure_city', "airline_name", 'aircraft_registration' , to count number flights
grouped_df = df_flights.groupBy('departure_city', "airline_name", 'aircraft_registration').count().orderBy(desc("count"))

# assign rank to every departure_city in function number flights
window_spec = Window.partitionBy('departure_city').orderBy(col('count').desc())
ranked_df = grouped_df.withColumn('rank', row_number().over(window_spec))

# filter to get only top 3 airline
top3_airplanes_used_df = ranked_df.filter(col('rank') <= 3)

# Sort by "departure_city" and 'rank'
top3_airplanes_used_by_country_df = top3_airplanes_used_df.orderBy("departure_city", 'rank')

top3_airplanes_used_by_country_df.toPandas()


Unnamed: 0,departure_city,airline_name,aircraft_registration,count,rank
0,Addis_Ababa,Ethiopian Airlines,No data aircraft_registration,1,1
1,Athens,Corendon Air,No data aircraft_registration,1,1
2,Athens,SAS,No data aircraft_registration,1,2
3,Dubai,Air Arabia,No data aircraft_registration,1,1
4,Dubai,flynas,No data aircraft_registration,1,2
5,Ho_Chi_Minh,Vietnam Airlines,No data aircraft_registration,3,1
6,Ho_Chi_Minh,VietJet Air,No data aircraft_registration,2,2
7,Ho_Chi_Minh,No data airline_name,No data aircraft_registration,2,3
8,Honolulu,Aloha Air Cargo,No data aircraft_registration,2,1
9,Istanbul,SA AVIANCA,No data aircraft_registration,1,1


**Q7: Quel aéroport a la plus grande différence entre le nombre de vol sortant et le nombre de vols entrants ?**

In [0]:
# Number flights to departure
outgoing_flights = df_flights.groupBy('departure_airport').count().withColumnRenamed('count', 'outgoing_count')

# Number flights to arrival
incoming_flights = df_flights.groupBy('arrival_airport').count().withColumnRenamed('count', 'incoming_count')

# Join two dataframe in using "outer join"
airport_flights_diff = outgoing_flights.join(incoming_flights, outgoing_flights['departure_airport'] == incoming_flights['arrival_airport'], 'outer')

# Calculate diffrence between outgoing_flights and incoming_flights; and take absolute value
airport_flights_diff = airport_flights_diff.withColumn('flight_diff', abs(col('outgoing_count') - col('incoming_count')))


airport_with_max_diff = airport_flights_diff.orderBy(col('flight_diff').desc())

print(f"L'aéroport avec la plus grande différence entre le nombre de vol sortant et le nombre de vols entrants est: {airport_with_max_diff.first()}")
airport_with_max_diff.limit(1).show()
airport_with_max_diff.toPandas()

L'aéroport avec la plus grande différence entre le nombre de vol sortant et le nombre de vols entrants est: Row(departure_airport='Singapore Changi', outgoing_count=26, arrival_airport='Singapore Changi', incoming_count=3, flight_diff=23)
+-----------------+--------------+----------------+--------------+-----------+
|departure_airport|outgoing_count| arrival_airport|incoming_count|flight_diff|
+-----------------+--------------+----------------+--------------+-----------+
| Singapore Changi|            26|Singapore Changi|             3|         23|
+-----------------+--------------+----------------+--------------+-----------+



Unnamed: 0,departure_airport,outgoing_count,arrival_airport,incoming_count,flight_diff
0,Singapore Changi,26.0,Singapore Changi,3.0,23.0
1,Ninoy Aquino International,19.0,Ninoy Aquino International,2.0,17.0
2,Kunming,8.0,Kunming,2.0,6.0
3,Tan Son Nhat International,9.0,Tan Son Nhat International,3.0,6.0
4,Jinan,1.0,Jinan,6.0,5.0
...,...,...,...,...,...
57,,,Taiwan Taoyuan International (Chiang Kai Shek ...,5.0,
58,Tri-cities Regional,3.0,,,
59,,,Urumqi,1.0,
60,,,Vienna International,6.0,


## Save data

In [0]:
from datetime import datetime
# column to register date and hour data is register
df_flights = df_flights.withColumn('date_save', lit(datetime.today().strftime('%Y-%m-%d')))
df_flights = df_flights.withColumn('hour_save', lit(datetime.today().strftime('%H:%M:%S')))

**Save flights**

In [0]:
# In CSV
from datetime import datetime
df_flights.write.format("csv").mode("overwrite").save(f"Flights/rawzone/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")


In [0]:
# In PARQUET
df_flights.write.format("parquet").mode("overwrite").save(f"Flights/rawzone/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.parquet")

**Save others data**

In [0]:
# Q1
number_flights_by_airline.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{number_flights_by_airline}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q2
max_regional_flights_by_timezone.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{max_regional_flights_by_timezone}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q3
flights_with_max_duration.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{flights_with_max_duration}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q4
average_flight_duration.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{average_flight_duration}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q5
number_flights_by_departure_airport.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{number_flights_by_departure_airport}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

number_flights_by_arrival_airport.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{number_flights_by_arrival_airport}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q6
top3_airplanes_used_by_country_df.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{top3_airplanes_used_by_country_df}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

# Q7
airport_with_max_diff.write.format("csv").mode("overwrite").save(f"Flights/results_requests/{airport_with_max_diff}/tech_year={datetime.today().strftime('%Y')}/tech_month={datetime.today().strftime('%Y-%m')}/tech_day=datetime.today().strftime('%Y-%m-%d')/flights{datetime.today().strftime('%Y%m%d%H%M%S')}.csv")

In [0]:
spark.stop()

------------------------------------------------------------------------------ FIN ------------------------------------------------------------------------------