<div align="center">
<div style="width:100%; aspect-ratio:16/9;">
  <img src="../img/welcome.png" alt="img" style="width:100%; height:auto; border-radius:8px;">
</div>
</div>

## 🚀 **Problem Statement**

Build a **scalable data processing pipeline** using PySpark to analyze flight delays across the United States.  
The objective is to simulate a real-world data engineering workflow involving:

- **Data ingestion**
- **Data cleaning**
- **Data transformation and enrichment**
- **Data analysis** (using both DataFrame and SQL APIs)
- **Output delivery**

I will work with a publicly available dataset and execute all tasks in a local development environment.  
This project is designed to:

- Assess and improve my proficiency in PySpark
- Practice efficient data processing techniques
- Integrate with external storage systems (e.g., AWS S3)
- Apply proper version control and documentation

> ✈️ **My mission:**  
> Uncover insights about flight delays and cancellations in the U.S. for 2015, and answer some key questions:  
>Extract more significant insights from the data

## 🛫 **About the Dataset**

---

The **U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics** tracks the on-time performance of domestic flights operated by large air carriers.

Summary information on the number of **on-time, delayed, canceled, and diverted flights** is published in DOT's monthly *Air Travel Consumer Report* and is included in this dataset of **2015 flight delays and cancellations**.

> 📊 This dataset provides a comprehensive view of flight performance across the United States, enabling analysis of delays, cancellations, and operational patterns for the year 2015.



### 📚 **Data Dictionary**

---

##### **1. `airlines.csv`**

| Column      | Description                              |
|-------------|------------------------------------------|
| `IATA_CODE` | Code that uniquely identifies an airline |
| `AIRLINE`   | Name of the airline                     |

---

##### **2. `airports.csv`**

| Column      | Description                                 |
|-------------|---------------------------------------------|
| `IATA_CODE` | Code that uniquely identifies an airport    |
| `AIRPORT`   | Name of the airport                        |
| `CITY`      | City where the airport is located          |
| `STATE`     | State where the airport is located         |
| `COUNTRY`   | Country where the airport is located       |
| `LATITUDE`  | Latitude of the airport                    |
| `LONGITUDE` | Longitude of the airport                   |

---

##### **3. `flights.csv`**

| Column                | Description                                                                 |
|-----------------------|-----------------------------------------------------------------------------|
| `YEAR`                | Year of the flight                                                          |
| `MONTH`               | Month of the flight                                                         |
| `DAY`                 | Day of the flight                                                           |
| `DAY_OF_WEEK`         | Day of the week of flight                                                   |
| `AIRLINE`             | Name of the airline                                                         |
| `FLIGHT_NUMBER`       | Unique identifier for the flight                                            |
| `TAIL_NUMBER`         | Unique identifier for the aircraft                                          |
| `ORIGIN_AIRPORT`      | Airport code of the starting airport                                       |
| `DESTINATION_AIRPORT` | Airport code of the destination airport                                    |
| `SCHEDULED_DEPARTURE` | Scheduled departure time of the flight                                      |
| `DEPARTURE_TIME`      | Actual departure time of the flight                                         |
| `DEPARTURE_DELAY`     | Delay in departure of the flight                                            |
| `TAXI_OUT`            | Time elapsed between gate departure and wheels off                          |
| `WHEELS_OFF`          | Time when the aircraft's wheels leave the ground                            |
| `SCHEDULED_TIME`      | Planned time amount needed for the flight trip                              |
| `ELAPSED_TIME`        | AIR_TIME + TAXI_IN + TAXI_OUT                                               |
| `AIR_TIME`            | Time between wheels_off and wheels_on                                       |
| `DISTANCE`            | Distance between two airports                                               |
| `WHEELS_ON`           | Time when the wheels of the aircraft touch the ground                       |
| `TAXI_IN`             | Time elapsed between wheels-on and gate arrival at the destination airport  |
| `SCHEDULED_ARRIVAL`   | Planned arrival time of the flight                                          |
| `ARRIVAL_TIME`        | WHEELS_ON + TAXI_IN                                                         |
| `ARRIVAL_DELAY`       | ARRIVAL_TIME minus SCHEDULED_ARRIVAL                                        |
| `CANCELLED`           | Flight Cancelled (1 = cancelled)                                            |
| `DIVERTED`            | Aircraft landed at an unscheduled airport                                   |
| `CANCELLATION_REASON` | Reason for cancellation (A = carrier, B = weather, C = national air system, D = security) |
| `AIR_SYSTEM_DELAY`    | Delay due to the national air system                                        |
| `SECURITY_DELAY`      | Delay due to security                                                       |
| `AIRLINE_DELAY`       | Delay due to the airline                                                    |
| `LATE_AIRCRAFT_DELAY` | Delay due to the aircraft                                                   |
| `WEATHER_DELAY`       | Delay due to the weather                                                    |

#

##

### 📝 **Let's start the analysis**

---

In [1]:
#creating neccessary imports
import sys
import os
sys.path.append(os.path.abspath(".."))


####  ⏳ **Loading the data and creating spark session**

---

In [2]:
#lets start the spark session with the data ingestion module functions and view some data

from src.data_ingestion import get_spark_session, load_dataframes

spark , s3_bucket = get_spark_session()
spark.sparkContext.setLogLevel("ERROR")  # Only show errors



:: loading settings :: url = jar:file:/home/new-user/Documents/PySpark-EDA-Flights-Data/myvenv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/new-user/.ivy2/cache
The jars for the packages stored in: /home/new-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2c6f8d73-ee2a-4520-9e2d-65d40294b8ab;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 170ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.2 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------

In [3]:
spark

#### ‍🔬 **Basic Data Exploration**
---

In [4]:
#lets view the data
flights_df, airlines_df, airports_df = load_dataframes(spark, s3_bucket)

flights_df.show(5)
airlines_df.show(5)
airports_df.show(5)


                                                                                

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [5]:
#lets inspect the schema of the data
flights_df.printSchema()


root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [6]:
airlines_df.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



In [7]:
airports_df.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)



In [8]:
#lets view the summary of the data
flights_df.describe().show()



+-------+------+------+-----------------+-----------------+-------+------------------+-----------+--------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+
|summary|  YEAR| MONTH|              DAY|      DAY_OF_WEEK|AIRLINE|     FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|    DEPARTURE_TIME|   DEPARTURE_DELAY|          TAXI_OUT|        WHEELS_OFF|   SCHEDULED_TIME|      ELAPSED_TIME|          AIR_TIME|         DISTANCE|        WHEELS_ON|          TAXI_IN| SCHEDULED_ARRIVAL|     ARRIVAL_TIME|     ARRIVAL_DELAY|            DIVERTED|           CANCELLED|CANCELLA

                                                                                

In [9]:
airlines_df.describe().show()

+-------+---------+--------------------+
|summary|IATA_CODE|             AIRLINE|
+-------+---------+--------------------+
|  count|       14|                  14|
|   mean|     NULL|                NULL|
| stddev|     NULL|                NULL|
|    min|       AA|Alaska Airlines Inc.|
|    max|       WN|      Virgin America|
+-------+---------+--------------------+



In [10]:
airports_df.describe().show()

+-------+---------+--------------------+--------+-----+-------+------------------+------------------+
|summary|IATA_CODE|             AIRPORT|    CITY|STATE|COUNTRY|          LATITUDE|         LONGITUDE|
+-------+---------+--------------------+--------+-----+-------+------------------+------------------+
|  count|      322|                 322|     322|  322|    322|               319|               319|
|   mean|     NULL|                NULL|    NULL| NULL|   NULL|38.981243918495295|-98.37896445141064|
| stddev|     NULL|                NULL|    NULL| NULL|   NULL| 8.616735581018041|21.523492046498102|
|    min|      ABE|Aberdeen Regional...|Aberdeen|   AK|    USA|          13.48345|        -176.64603|
|    max|      YUM|Yuma Internationa...|    Yuma|   WY|    USA|          71.28545|         -64.79856|
+-------+---------+--------------------+--------+-----+-------+------------------+------------------+



#### 🧽  **Cleaning the data**
---

In [11]:
#lets treat the nulls in the data
from src.data_cleaning import clean_nulls

airlines_df = clean_nulls(airlines_df)



Columns to drop (> 20% nulls): []
Columns to drop rows for (<20% nulls): ['IATA_CODE', 'AIRLINE']

Dropped rows with nulls in columns: ['IATA_CODE', 'AIRLINE']


In [12]:
airports_df = clean_nulls(airports_df)



Columns to drop (> 20% nulls): []
Columns to drop rows for (<20% nulls): ['IATA_CODE', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE']

Dropped rows with nulls in columns: ['IATA_CODE', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE']


In [13]:
flights_df = clean_nulls(flights_df)





Columns to drop (> 20% nulls): ['CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY']
Columns to drop rows for (<20% nulls): ['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED']

Dropped rows with nulls in columns: ['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED']


                                                                                

In [14]:
# lets check the nulls now
from pyspark.sql import functions as F

exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in airlines_df.columns]
airlines_df.select(exprs).show()

+---------+-------+
|IATA_CODE|AIRLINE|
+---------+-------+
|        0|      0|
+---------+-------+



In [15]:
from pyspark.sql import functions as F

exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in airports_df.columns]
airports_df.select(exprs).show()

+---------+-------+----+-----+-------+--------+---------+
|IATA_CODE|AIRPORT|CITY|STATE|COUNTRY|LATITUDE|LONGITUDE|
+---------+-------+----+-----+-------+--------+---------+
|        0|      0|   0|    0|      0|       0|        0|
+---------+-------+----+-----+-------+--------+---------+



In [16]:
from pyspark.sql import functions as F

exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in flights_df.columns]
flights_df.select(exprs).show()



+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+
|   0|    0|  0|          0|      0|            0|          0|             0|                  0|            

                                                                                

##### 💡 Some observations:-
 * We went ahead with the strategy of dropping the nulls in the data, which are less than 20% of the total data in the specific columns instead of imputation because flights data is dependent on time and time is a critical factor in the data which is not easy to impute.
 
 * We also dropped columns with greater than 20% of nulls ,  as it will affect the analysis and the results.
 

#

#### 🔄 **Removing duplicates**
---

In [17]:
#lets remove the duplicates in the data

from src.data_cleaning import remove_duplicates

airlines_df = remove_duplicates(airlines_df)



Number of duplicates removed: 0


In [18]:
airports_df = remove_duplicates(airports_df)


Number of duplicates removed: 0


In [19]:
flights_df = remove_duplicates(flights_df)



Number of duplicates removed: 0


                                                                                

##### 🧐 Some observations:-

 * Viewing the flights dataframe, we can see that the time format of various columns is not correct we need to fix that.

 * Some columns have minutes mentioned directly instead of time in clock format.
 

#

#### 🧑‍🔬 **Formatting the time based columns**
---

In [20]:
#creating a function to format the time column
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

def format_time_spark(hhmm):
    if hhmm is None:
        return None
    try:
        hhmm = int(hhmm)
        if hhmm == 2400:
            hhmm = 0
        hhmm_str = f"{hhmm:04d}"
        hour = int(hhmm_str[:2])
        minute = int(hhmm_str[2:])
        return f"{hour:02d}:{minute:02d}"
    except Exception:
        return None



#registering the udf
format_time_udf = F.udf(format_time_spark, StringType())

In [21]:
flights_df = flights_df.withColumn(
    "DEPARTURE_TIME_STR",  
    format_time_udf(F.col("DEPARTURE_TIME"))
)


In [22]:
flights_df = flights_df.withColumn(
    "ARRIVAL_TIME",  
    format_time_udf(F.col("ARRIVAL_TIME"))
)

In [23]:
flights_df = flights_df.withColumn(
    "SCHEDULED_DEPARTURE",  
    format_time_udf(F.col("SCHEDULED_DEPARTURE"))
)



In [24]:
flights_df = flights_df.withColumn(
    "SCHEDULED_ARRIVAL",  
    format_time_udf(F.col("SCHEDULED_ARRIVAL"))
)

In [25]:
#lets create a date column combining the year, month, day
flights_df = flights_df.withColumn(
    "DATE",  
    F.concat_ws("-", F.col("YEAR"), F.col("MONTH"), F.col("DAY"))
)
flights_df.show()




+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+--------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|DEPARTURE_TIME_STR|    DATE|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+--------+
|2015|    1|  1|         

                                                                                

In [26]:
from pyspark.sql import functions as F

def create_flight_datetime(df, date_col, time_col, new_col):
    """
    Combines a date column (yyyy-M-d or yyyy-MM-dd) and a time string column (HH:mm) into a timestamp column.
    Pads the date as needed.
    """
    # 1. Pad the date
    df = df.withColumn("PADDED_DATE",
        F.concat_ws("-",
            F.split(F.col(date_col), "-")[0],
            F.lpad(F.split(F.col(date_col), "-")[1], 2, "0"),
            F.lpad(F.split(F.col(date_col), "-")[2], 2, "0")
        )
    )

    # 2. Combine padded date and time
    df = df.withColumn(
        "DATETIME_STR",
        F.when(
            F.col("PADDED_DATE").isNotNull() & F.col(time_col).isNotNull(),
            F.concat_ws(" ", F.col("PADDED_DATE"), F.col(time_col))
        ).otherwise(None)
    )

    # 3. Convert to timestamp
    df = df.withColumn(
        new_col,
        F.to_timestamp(F.col("DATETIME_STR"), "yyyy-MM-dd HH:mm")
    )

    # 4. Drop intermediate columns
    df = df.drop("PADDED_DATE", "DATETIME_STR")
    return df


In [27]:
flights_df = create_flight_datetime(flights_df, "DATE", "DEPARTURE_TIME_STR", "DEPARTURE_DATE_TIME")



In [28]:
flights_df = create_flight_datetime(flights_df, "DATE", "ARRIVAL_TIME", "ARRIVAL_DATE_TIME")



##### 🧠 Some observations:-

 * The departure delay column wherever negative means that the flight departed early.
 
 * Similarly for  arrival time, if it is negative, it means that the flight arrived early.

#

#### ✨ **Data Enrichment**
---




<div align="center">
<div style="width:450px; aspect-ratio:16/9;">
  <img src="../img/brodcast.jpeg" alt="brodcast" style="width:100%; height:auto; border-radius:8px;">
</div>
</div>

In [29]:
# added a delay category column to the dataframe
from src.data_enrichment import add_delay_category

flights_df = add_delay_category(flights_df, delay_col="ARRIVAL_DELAY", new_col="DELAY_CATEGORY")


In [30]:
from pyspark.sql.functions import broadcast

# 1. Rename in airlines_df
airlines_df = airlines_df.withColumnRenamed("AIRLINE", "AIRLINE_NAME")

# 2. Join
flights_df_joined = flights_df.join(
    broadcast(airlines_df),
    flights_df.AIRLINE == airlines_df.IATA_CODE,
    how="left"
).drop(airlines_df.IATA_CODE)

flights_df_joined = flights_df_joined.withColumnRenamed("AIRLINE", "AIRLINE_CODE")

flights_df_joined.show(5)



+----+-----+---+-----------+------------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+--------+-------------------+-------------------+--------------+--------------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE_CODE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|DEPARTURE_TIME_STR|    DATE|DEPARTURE_DATE_TIME|  ARRIVAL_DATE_TIME|DELAY_CATEGORY|        AIRLINE_NAME|
+----+-----+---+-----------+------------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+

                                                                                

In [31]:
# . Join with airports for origin airport details
flights_with_origin = flights_df_joined.join(
    broadcast(airports_df),
    flights_df_joined.ORIGIN_AIRPORT == airports_df.IATA_CODE,
    how="left"
).withColumnRenamed("AIRPORT", "ORIGIN_AIRPORT_NAME") \
 .withColumnRenamed("CITY", "ORIGIN_CITY") \
 .withColumnRenamed("STATE", "ORIGIN_STATE") \
 .withColumnRenamed("COUNTRY", "ORIGIN_COUNTRY") \
 .withColumnRenamed("LATITUDE", "ORIGIN_LATITUDE") \
 .withColumnRenamed("LONGITUDE", "ORIGIN_LONGITUDE")


In [32]:
flights_with_origin = flights_with_origin.drop("IATA_CODE")

In [33]:

# . Join with airports for destination airport details

flights_enriched_df = flights_with_origin.join(
    broadcast(airports_df),
    flights_with_origin.DESTINATION_AIRPORT == airports_df.IATA_CODE,
    how="left"
).withColumnRenamed("AIRPORT", "DESTINATION_AIRPORT_NAME") \
 .withColumnRenamed("CITY", "DESTINATION_CITY") \
 .withColumnRenamed("STATE", "DESTINATION_STATE") \
 .withColumnRenamed("COUNTRY", "DESTINATION_COUNTRY") \
 .withColumnRenamed("LATITUDE", "DESTINATION_LATITUDE") \
 .withColumnRenamed("LONGITUDE", "DESTINATION_LONGITUDE")



In [34]:
flights_enriched_df.show(5)



+----+-----+---+-----------+------------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+--------+-------------------+-------------------+--------------+--------------------+--------------------+-------------+------------+--------------+---------------+----------------+---------+------------------------+-----------------+-----------------+-------------------+--------------------+---------------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE_CODE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|DEPARTURE_TIME_STR|    DATE|DEPARTURE_DATE_TIME|  ARRIVAL_DATE_

                                                                                

#

#### 🛠️  **Dropping the columns not needed for our analysis**
---

In [35]:
#lets drop the columns not needed for our analysis

flights_enriched_df = flights_enriched_df.drop("DAY", "DEPARTURE_TIME", "IATA_CODE" , "FLIGHT_NUMBER", "TAIL_NUMBER")



In [36]:
# we got some nulls in the data, lets drop them
flights_enriched_df = flights_enriched_df.dropna()


In [37]:
exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in flights_enriched_df.columns]
flights_enriched_df.select(exprs).show()

[Stage 209:>                                                      (0 + 12) / 12]

+----+-----+-----------+------------+--------------+-------------------+-------------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+----+-------------------+-----------------+--------------+------------+-------------------+-----------+------------+--------------+---------------+----------------+------------------------+----------------+-----------------+-------------------+--------------------+---------------------+
|YEAR|MONTH|DAY_OF_WEEK|AIRLINE_CODE|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|DEPARTURE_TIME_STR|DATE|DEPARTURE_DATE_TIME|ARRIVAL_DATE_TIME|DELAY_CATEGORY|AIRLINE_NAME|ORIGIN_AIRPORT_NAME|ORIGIN_CITY|ORIGIN_STATE|ORIGIN_COUNTRY|ORIGIN_LATITUDE|ORIGIN_LONGITUD

                                                                                

##### 📈 Some observations:-

 * We have successfully joined the dataframes with the airlines and airports data and our final dataframe is ready for analysis named `flights_enriched_df`.

 * We have also dropped the columns not needed for our analysis.
 
 * Brodcast joins are great when we are joining a small dataframe with a large dataframe as it avoid shuffling the data and it is more efficient.

* After joining the dataframes we even got some nulls in the data, which we dropped.

#

#### 📥 **Caching the dataframe**
---




<div align="center">
<div style="width:450px; aspect-ratio:16/9;">
  <img src="../img/caching.jpeg" alt="brodcast" style="width:100%; height:auto; border-radius:8px;">
</div>
</div>

In [38]:
#so this is our final data that we will be using a lot so lets cache it
flights_enriched_df.cache()



DataFrame[YEAR: int, MONTH: int, DAY_OF_WEEK: int, AIRLINE_CODE: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: string, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: string, ARRIVAL_TIME: string, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, DEPARTURE_TIME_STR: string, DATE: string, DEPARTURE_DATE_TIME: timestamp, ARRIVAL_DATE_TIME: timestamp, DELAY_CATEGORY: string, AIRLINE_NAME: string, ORIGIN_AIRPORT_NAME: string, ORIGIN_CITY: string, ORIGIN_STATE: string, ORIGIN_COUNTRY: string, ORIGIN_LATITUDE: double, ORIGIN_LONGITUDE: double, DESTINATION_AIRPORT_NAME: string, DESTINATION_CITY: string, DESTINATION_STATE: string, DESTINATION_COUNTRY: string, DESTINATION_LATITUDE: double, DESTINATION_LONGITUDE: double]

In [39]:
flights_enriched_df.show(5)



+----+-----+-----------+------------+--------------+-------------------+-------------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+------------------+--------+-------------------+-------------------+--------------+--------------------+--------------------+-------------+------------+--------------+---------------+----------------+------------------------+-----------------+-----------------+-------------------+--------------------+---------------------+
|YEAR|MONTH|DAY_OF_WEEK|AIRLINE_CODE|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|DEPARTURE_TIME_STR|    DATE|DEPARTURE_DATE_TIME|  ARRIVAL_DATE_TIME|DELAY_CATEGORY|        AIRLINE_NAME| ORIGIN_AIRPORT_NAME|  ORIGIN_CITY|ORIGIN_STATE|ORIGIN_COUN

                                                                                

##### 💡 Some observations:-
* Caching the dataframe is great when we are using the same dataframe multiple times in our analysis.
* It is great for performance and it is more efficient.



#

#### 📊 **Data Analysis**


<div align="center">
<div style="width:450px; aspect-ratio:16/9;">
  <img src="../img/data_analysis.jpeg" alt="img" style="width:100%; height:auto; border-radius:8px;">
</div>
</div>

##### 🧩 Monthly flight volume and percentage of delayed flights. 
---


In [40]:
flights_enriched_df.groupby('MONTH').count().show()

+-----+------+
|MONTH| count|
+-----+------+
|    1|174523|
+-----+------+



                                                                                

In [41]:
DELAY_DF = flights_enriched_df.groupby('DELAY_CATEGORY').count()
DELAY_DF.show()

+--------------+-----+
|DELAY_CATEGORY|count|
+--------------+-----+
|  Medium Delay|35211|
|    Long Delay|17774|
|   Short Delay|37679|
|       On-Time|83859|
+--------------+-----+



In [42]:
from pyspark.sql import functions as F

# 1. Define which categories are considered "delayed"
delayed_categories = ["Short Delay", "Medium Delay", "Long Delay"]

# 2. Calculate monthly flight volume and delayed flight count
monthly_stats = (
    flights_enriched_df
    .groupBy("MONTH")
    .agg(
        F.count("*").alias("total_flights"),
        F.sum(F.when(F.col("DELAY_CATEGORY").isin(delayed_categories), 1).otherwise(0)).alias("delayed_flights")
    )
    .withColumn(
        "delayed_percentage",
        (F.col("delayed_flights") / F.col("total_flights") * 100).cast("double")
    )
    .orderBy("MONTH")
)

monthly_stats.show()

+-----+-------------+---------------+------------------+
|MONTH|total_flights|delayed_flights|delayed_percentage|
+-----+-------------+---------------+------------------+
|    1|       174523|          90664|51.949599766219926|
+-----+-------------+---------------+------------------+



##### 🔗 Airline-wise average departure delay and on-time performance.
---

In [43]:

airline_performance = (
    flights_enriched_df
    .groupBy("AIRLINE_NAME")
    .agg(
        F.avg("DEPARTURE_DELAY").alias("avg_departure_delay"),
        F.count("*").alias("total_flights"),
        F.sum(F.when(F.col("DELAY_CATEGORY") == "On-Time", 1).otherwise(0)).alias("on_time_flights")
    )
    .withColumn(
        "on_time_percentage",
        (F.col("on_time_flights") / F.col("total_flights") * 100).cast("double")
    )
    .orderBy(F.asc("avg_departure_delay"))
)

airline_performance.show()

+--------------------+-------------------+-------------+---------------+------------------+
|        AIRLINE_NAME|avg_departure_delay|total_flights|on_time_flights|on_time_percentage|
+--------------------+-------------------+-------------+---------------+------------------+
|Hawaiian Airlines...|  4.897582957804179|         2441|           1170|  47.9311757476444|
|Alaska Airlines Inc.|  5.494282334384858|         5072|           3156| 62.22397476340694|
|     US Airways Inc.|  9.102912005078156|        12603|           6746|53.526938030627626|
|      Virgin America|  9.177145981410607|         1829|           1096| 59.92345544013122|
|Delta Air Lines Inc.|  9.938347307387478|        23973|          15517| 64.72698452425647|
|     JetBlue Airways| 16.128854229154168|         8335|           4188|50.245950809838035|
|Southwest Airline...|  17.60894411947058|        37097|          17181|46.313718090411626|
|Atlantic Southeas...|  19.07573298712539|        18486|           8360|  45.223

                                                                                

##### 🔴 Total number of cancellations and their distribution by cancellation reason.
---


In [45]:
# - Total number of cancellations and their distribution by cancellation reason.

flights_enriched_df.createOrReplaceTempView("flights_enriched_df")

result = spark.sql("SELECT CANCELLED, COUNT(*) AS cancellation_count FROM flights_enriched_df WHERE CANCELLED = 1 GROUP BY CANCELLED")

result.show()



+---------+------------------+
|CANCELLED|cancellation_count|
+---------+------------------+
+---------+------------------+



#### 🚦 Which is the busiest airport i.e most flights take-off and landing?
---


In [46]:

from pyspark.sql import functions as F

# Combine all take-offs and landings into a single column
all_airports = (
    flights_enriched_df
    .select(F.col("ORIGIN_AIRPORT_NAME").alias("airport_name"))
    .union(
        flights_enriched_df.select(F.col("DESTINATION_AIRPORT_NAME").alias("airport_name"))
    )
)

# Group by airport and count total flights (take-offs + landings)
busiest_airports = (
    all_airports
    .groupBy("airport_name")
    .agg(F.count("*").alias("total_flights"))
    .orderBy(F.desc("total_flights"))
)

busiest_airports.show(10)





+--------------------+-------------+
|        airport_name|total_flights|
+--------------------+-------------+
|Hartsfield-Jackso...|        21923|
|Dallas/Fort Worth...|        16947|
|Chicago O'Hare In...|        16210|
|Los Angeles Inter...|        13206|
|Denver Internatio...|        13166|
|George Bush Inter...|        10287|
|Phoenix Sky Harbo...|        10111|
|San Francisco Int...|         9822|
|McCarran Internat...|         8716|
|Orlando Internati...|         7753|
+--------------------+-------------+
only showing top 10 rows



                                                                                

#### 🌆 Which city has the most flights i.e most flights take-off and landing in that city?
---


In [47]:
from pyspark.sql import functions as F

# Combine all take-offs and landings into a single column
all_cities = (
    flights_enriched_df
    .select(F.col("ORIGIN_CITY").alias("city_name"))
    .union(
        flights_enriched_df.select(F.col("DESTINATION_CITY").alias("city_name"))
    )
)

# Group by city and count total flights (take-offs + landings)
busiest_cities = (
    all_cities
    .groupBy("city_name")
    .agg(F.count("*").alias("total_flights"))
    .orderBy(F.desc("total_flights"))
)

busiest_cities.show(10)



+-----------------+-------------+
|        city_name|total_flights|
+-----------------+-------------+
|          Atlanta|        21923|
|          Chicago|        21025|
|Dallas-Fort Worth|        16947|
|          Houston|        13747|
|      Los Angeles|        13206|
|           Denver|        13166|
|         New York|        13058|
|          Phoenix|        10111|
|    San Francisco|         9822|
|        Las Vegas|         8716|
+-----------------+-------------+
only showing top 10 rows



                                                                                

#### 🚨 Which airlines has the least number of flight?
---


In [48]:

flights_enriched_df.groupBy("AIRLINE_NAME").count().orderBy("count", ascending=True).show()


+--------------------+-----+
|        AIRLINE_NAME|count|
+--------------------+-----+
|      Virgin America| 1829|
|Hawaiian Airlines...| 2441|
|Frontier Airlines...| 2636|
|    Spirit Air Lines| 3253|
|Alaska Airlines Inc.| 5072|
|     JetBlue Airways| 8335|
|American Eagle Ai...| 9779|
|     US Airways Inc.|12603|
|United Air Lines ...|14710|
|American Airlines...|16542|
|Skywest Airlines ...|17767|
|Atlantic Southeas...|18486|
|Delta Air Lines Inc.|23973|
|Southwest Airline...|37097|
+--------------------+-----+



##### 🧠 Some observations:-

* As we took a subset of the original data,thats why we have only month 1, we can see that the percentage of delayed flights is 51.94% which is quite high.


* From the analysis we can see that Hawaiian Airlines has the least number of delayed flights showing their efficiency.


* From the analysis we can see that Hartsfield-Jackson Atlanta International Airport has the most number of flights i.e busiest airport.


* We can also see that Atlanta and Chicago city have the most number of flights.


* From the analysis we can see that Virgin America has the least number of flights , indicating that it is not a very popular airline.

---

#### 📤 **Saving to s3 parquet busiest cities**

In [49]:
from src.upload_csv_to_s3 import save_df_to_s3_parquet
save_df_to_s3_parquet(busiest_cities)

                                                                                

DataFrame saved directly to s3a://dvflightdata/parquet_export_20250511_184028_7d0f3813/ as Parquet.


#

#### ⚔️ Udf vs native PySpark functions
---



In [50]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def classify_delay_udf(delay):
    if delay is None:
        return "Unknown"
    elif delay <= 0:
        return "On-Time"
    elif delay <= 15:
        return "Short Delay"
    elif delay <= 60:
        return "Medium Delay"
    else:
        return "Long Delay"

# Register the UDF
delay_category_udf = udf(classify_delay_udf, StringType())


In [51]:
import time

# UDF version
start_udf = time.time()
df_udf = flights_df.withColumn("DELAY_CATEGORY_UDF", delay_category_udf(flights_df.ARRIVAL_DELAY))
df_udf.count()  #  evaluation
end_udf = time.time()
print(f"UDF version took {end_udf - start_udf:.2f} seconds")

# Native version
start_native = time.time()
df_native = flights_df.withColumn(
    "DELAY_CATEGORY_NATIVE",
    F.when(F.col("ARRIVAL_DELAY").isNull(), "Unknown")
     .when(F.col("ARRIVAL_DELAY") <= 0, "On-Time")
     .when(F.col("ARRIVAL_DELAY") <= 15, "Short Delay")
     .when(F.col("ARRIVAL_DELAY") <= 60, "Medium Delay")
     .otherwise("Long Delay")
)
df_native.count()  #  evaluation
end_native = time.time()
print(f"Native version took {end_native - start_native:.2f} seconds")

                                                                                

UDF version took 5.03 seconds




Native version took 4.72 seconds


                                                                                

##### 🗒️ Some observations:-

* Native PySpark functions are generally faster and more scalable than UDFs, especially on large datasets.

* Small timing differences on small data or single runs are not significant as in our case.

* For production and big data, always prefer native functions when possible.

* In my case the native version took 4.72 seconds and UDF version took 5.03 seconds

#


<div align="center">
<div style="width:100%; aspect-ratio:16/9;">
  <img src="../img/safe.png" alt="img" style="width:100%; height:auto; border-radius:8px;">
</div>
</div>

#### 😊💗 **Thankyou for visiting my project, I hope you liked it.**
