##### API"Links":"https://aviationstack.com/documentation
##### "Build"Data"lake"from"scratch":"https://towardsdatascience.com/how-to-build-a-data-lake-from-scratch-part-1-the-setup-34ea1665a06e

In [83]:
from dotenv import load_dotenv
import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import logging
from datetime import datetime
import json
from pyspark.sql.types import *

In [84]:
load_dotenv("../properties/.env")

True

In [85]:
logging.basicConfig(format="%(name)s - %(levelname)s - %(asctime)s - %(message)s", level=logging.INFO)

In [86]:
def request():
    params = {
    "access_key" : str(os.environ["API_KEY"])
}
    raw_data = requests.get("http://api.aviationstack.com/v1/flights", params=params).json()
    return raw_data["data"]

In [87]:
def pandas(data):
    pd.set_option("display.max_columns", None)
    df = pd.DataFrame.from_dict(pd.json_normalize(data))
    return df.head(1)

In [88]:
spark = SparkSession.builder.master("local[*]").appName("flights-app").config("spark.sql.warehouse.dir", "../data/spark-warehouse").getOrCreate()

23/11/27 16:24:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [89]:
location_schema = StructType([
    StructField("airport", StringType(), True),
    StructField("timezone", StringType(), True),
    StructField("iata", StringType(), True),
    StructField("icao", StringType(), True),
    StructField("terminal", StringType(), True),
    StructField("gate", StringType(), True),
    StructField("delay", IntegerType(), True),
    StructField("scheduled", TimestampType(), True),
    StructField("estimated", TimestampType(), True),
    StructField("actual", TimestampType(), True),
    StructField("estimated_runway", TimestampType(), True),
    StructField("actual_runway", TimestampType(), True),
])

# Define the schema for the airline section
airline_schema = StructType([
    StructField("name", StringType(), True),
    StructField("iata", StringType(), True),
    StructField("icao", StringType(), True),
])

# Define the schema for the flight section
flight_schema = StructType([
    StructField("number", StringType(), True),
    StructField("iata", StringType(), True),
    StructField("icao", StringType(), True),
    StructField("codeshared", StructType([
        StructField("airline_name", StringType(), True),
        StructField("airline_iata", StringType(), True),
        StructField("airline_icao", StringType(), True),
        StructField("flight_number", StringType(), True),
        StructField("flight_iata", StringType(), True),
        StructField("flight_icao", StringType(), True),
    ]), True),
])

# Define the main schema for the entire JSON
main_schema = StructType([
    StructField("flight_date", TimestampType(), True),
    StructField("flight_status", StringType(), True),
    StructField("departure", location_schema, True),
    StructField("arrival", location_schema, True),
    StructField("airline", airline_schema, True),
    StructField("flight", flight_schema, True),
])


In [90]:
def write_data(data):
    with open(f'../data/bronze/{datetime.today().strftime("%Y-%m-%d")}_data.json', 'w') as f:
        return json.dump(data, f)

In [91]:
if not os.path.isfile(f'../data/bronze/{datetime.today().strftime("%Y-%m-%d")}_data.json'):
    print("File not present...")
    data=request()
    write_data(data)

File not present...


In [92]:
df = spark.read.option("multiline", "true").json(f"../data/bronze/{datetime.today().strftime('%Y-%m-%d')}_data.json", schema=main_schema)
# df = spark.read.option("multiline", "true").json("../data/2023-11-25_data.json", schema=main_schema)

In [93]:
def arrange_schema(df):
     return df.withColumn("dep_airport",col("arrival.airport"))\
                .withColumn("dep_timezone",col("departure.timezone"))\
                .withColumn("dep_iata",col("departure.iata"))\
                .withColumn("dep_icao",col("departure.icao"))\
                .withColumn("dep_terminal",col("departure.terminal"))\
                .withColumn("dep_gate",col("departure.gate"))\
                .withColumn("dep_delay",col("departure.delay"))\
                .withColumn("dep_scheduled",col("departure.scheduled"))\
                .withColumn("dep_estimated",col("departure.estimated"))\
                .withColumn("dep_actual",col("departure.actual"))\
                .withColumn("dep_estimated_runway",col("departure.estimated_runway"))\
                .withColumn("dep_actual_runway",col("departure.actual_runway"))\
                .withColumn("arr_timezone",col("arrival.timezone"))\
                .withColumn("arr_iata",col("arrival.iata"))\
                .withColumn("arr_icao",col("arrival.icao"))\
                .withColumn("arr_terminal",col("arrival.terminal"))\
                .withColumn("arr_gate",col("arrival.gate"))\
                .withColumn("arr_delay",col("arrival.delay"))\
                .withColumn("arr_scheduled",col("arrival.scheduled"))\
                .withColumn("arr_estimated",col("arrival.estimated"))\
                .withColumn("arr_actual",col("arrival.actual"))\
                .withColumn("arr_estimated_runway",col("arrival.estimated_runway"))\
                .withColumn("arr_actual_runway",col("arrival.actual_runway"))\
                .withColumn("airline_name",col("airline.name"))\
                .withColumn("airline_iata",col("airline.iata"))\
                .withColumn("airline_icao",col("airline.icao"))\
                .withColumn("flight_number",col("flight.number"))\
                .withColumn("flight_iata",col("flight.iata"))\
                .withColumn("flight_icao",col("flight.icao"))\
                .withColumn("codeshared_airline_name",col("flight.codeshared.airline_name"))\
                .withColumn("codeshared_airline_iata",col("flight.codeshared.airline_iata"))\
                .withColumn("codeshared_airline_icao",col("flight.codeshared.airline_icao"))\
                .withColumn("codeshared_flight_number",col("flight.codeshared.flight_number"))\
                .withColumn("codeshared_flight_iata",col("flight.codeshared.flight_iata"))\
                .withColumn("codeshared_flight_icao",col("flight.codeshared.flight_icao"))\
                .drop("departure", "arrival", "arrival", "flight", "airline")

In [94]:
df = arrange_schema(df)

In [95]:
df.show(5)

+-------------------+-------------+--------------------+------------------+--------+--------+------------+--------+---------+-------------------+-------------------+----------+--------------------+-----------------+------------------+--------+--------+------------+--------+---------+-------------------+-------------------+----------+--------------------+-----------------+-------------------+------------+------------+-------------+-----------+-----------+-----------------------+-----------------------+-----------------------+------------------------+----------------------+----------------------+
|        flight_date|flight_status|         dep_airport|      dep_timezone|dep_iata|dep_icao|dep_terminal|dep_gate|dep_delay|      dep_scheduled|      dep_estimated|dep_actual|dep_estimated_runway|dep_actual_runway|      arr_timezone|arr_iata|arr_icao|arr_terminal|arr_gate|arr_delay|      arr_scheduled|      arr_estimated|arr_actual|arr_estimated_runway|arr_actual_runway|       airline_name|airl

In [96]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show() 

+-----------+-------------+-----------+------------+--------+--------+------------+--------+---------+-------------+-------------+----------+--------------------+-----------------+------------+--------+--------+------------+--------+---------+-------------+-------------+----------+--------------------+-----------------+------------+------------+------------+-------------+-----------+-----------+-----------------------+-----------------------+-----------------------+------------------------+----------------------+----------------------+
|flight_date|flight_status|dep_airport|dep_timezone|dep_iata|dep_icao|dep_terminal|dep_gate|dep_delay|dep_scheduled|dep_estimated|dep_actual|dep_estimated_runway|dep_actual_runway|arr_timezone|arr_iata|arr_icao|arr_terminal|arr_gate|arr_delay|arr_scheduled|arr_estimated|arr_actual|arr_estimated_runway|arr_actual_runway|airline_name|airline_iata|airline_icao|flight_number|flight_iata|flight_icao|codeshared_airline_name|codeshared_airline_iata|codeshared_a

In [97]:
df.count()

100

In [98]:
df.select(min("dep_delay")).show()

+--------------+
|min(dep_delay)|
+--------------+
|             4|
+--------------+



In [99]:
df.select("dep_terminal").distinct().show()

+------------+
|dep_terminal|
+------------+
|           3|
|           5|
|           M|
|           D|
|           1|
|           I|
|           2|
|        NULL|
+------------+



In [100]:
df.schema["dep_estimated_runway"].dataType

TimestampType()

In [102]:
def handle_nulls(df): # 0999-12-27 00:50:39 = Null value
    df_no_nulls = df.fillna("Uknown")\
        .fillna(0, subset=["dep_delay", "arr_delay"])\
        .fillna("AZERTY", subset="dep_terminal")
    
    col_to_fill=["dep_estimated_runway", "dep_actual", "dep_actual_runway", "arr_actual_runway", "arr_estimated_runway", "arr_actual"]

    for colonne in col_to_fill:
        df_no_nulls = df_no_nulls.withColumn(
            colonne,
            when(
                col(colonne).isNull(),
                datetime(1945, 1, 1, 0, 0, 0)
            ).otherwise(col(colonne).cast(TimestampType()))
        )
        
    return df_no_nulls

In [103]:
df_no_nulls = handle_nulls(df)

In [104]:
df_no_nulls.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show() 

+-----------+-------------+-----------+------------+--------+--------+------------+--------+---------+-------------+-------------+----------+--------------------+-----------------+------------+--------+--------+------------+--------+---------+-------------+-------------+----------+--------------------+-----------------+------------+------------+------------+-------------+-----------+-----------+-----------------------+-----------------------+-----------------------+------------------------+----------------------+----------------------+
|flight_date|flight_status|dep_airport|dep_timezone|dep_iata|dep_icao|dep_terminal|dep_gate|dep_delay|dep_scheduled|dep_estimated|dep_actual|dep_estimated_runway|dep_actual_runway|arr_timezone|arr_iata|arr_icao|arr_terminal|arr_gate|arr_delay|arr_scheduled|arr_estimated|arr_actual|arr_estimated_runway|arr_actual_runway|airline_name|airline_iata|airline_icao|flight_number|flight_iata|flight_icao|codeshared_airline_name|codeshared_airline_iata|codeshared_a

In [106]:
df_no_nulls.printSchema()

root
 |-- flight_date: timestamp (nullable = true)
 |-- flight_status: string (nullable = false)
 |-- dep_airport: string (nullable = false)
 |-- dep_timezone: string (nullable = false)
 |-- dep_iata: string (nullable = false)
 |-- dep_icao: string (nullable = false)
 |-- dep_terminal: string (nullable = false)
 |-- dep_gate: string (nullable = false)
 |-- dep_delay: integer (nullable = true)
 |-- dep_scheduled: timestamp (nullable = true)
 |-- dep_estimated: timestamp (nullable = true)
 |-- dep_actual: timestamp (nullable = true)
 |-- dep_estimated_runway: timestamp (nullable = true)
 |-- dep_actual_runway: timestamp (nullable = true)
 |-- arr_timezone: string (nullable = false)
 |-- arr_iata: string (nullable = false)
 |-- arr_icao: string (nullable = false)
 |-- arr_terminal: string (nullable = false)
 |-- arr_gate: string (nullable = false)
 |-- arr_delay: integer (nullable = true)
 |-- arr_scheduled: timestamp (nullable = true)
 |-- arr_estimated: timestamp (nullable = true)
 |-- 

In [107]:
df_no_nulls.show()

+-------------------+-------------+--------------------+-------------------+--------+--------+------------+--------+---------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+--------+--------+------------+--------+---------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------+------------+-------------+-----------+-----------+-----------------------+-----------------------+-----------------------+------------------------+----------------------+----------------------+
|        flight_date|flight_status|         dep_airport|       dep_timezone|dep_iata|dep_icao|dep_terminal|dep_gate|dep_delay|      dep_scheduled|      dep_estimated|         dep_actual|dep_estimated_runway|  dep_actual_runway|       arr_timezone|arr_iata|arr_icao|arr_terminal|arr_gate|arr_delay|      arr_scheduled|      arr_estimated|         arr_actual|arr_estimated_run