<a href="https://colab.research.google.com/github/Javad-Manashti/Airport-Traffic-Analysis-and-Flight-Sequence-Tracking-Using-PySpark/blob/main/Airport_Traffic_Analysis_and_Flight_Sequence_Tracking_Using_PySpark_(Advanced).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Airport Traffic Analysis and Flight Sequence Tracking Using PySpark (Advanced)

**By: Javad Manashty (5146323306, mjmanashti@gmial.com)**

#### Description:
This enhanced PySpark code offers a professional solution for analyzing airport traffic and tracking flight sequences. It processes large datasets to deliver insights into flight scheduling and airport operations. The code is divided into two primary sections:

1. **Flight Sequence Tracking**: It determines the sequence of flights for each aircraft by identifying subsequent flights. This feature aids in understanding aircraft utilization and turnover, crucial for scheduling and operational efficiency.

2. **Airport Traffic Analysis**: This part of the script calculates the number of inbound and outbound flights at an airport in 15-minute intervals. Such granular data analysis is vital for comprehending traffic patterns, leading to better resource allocation and strategic planning at airports.

The script is adept at handling datasets with details such as flight origin, destination, and timings, leveraging PySpark's robust data processing features.

#### Key Components of the Code:
- **Initialization of Spark Session**: Establishes the Spark environment for data processing.
- **Data Ingestion and Preparation**: Efficiently reads flight data from a CSV file into a Spark DataFrame, converting timestamps into a suitable format for analysis.
- **Construction of Flight Sequences**: Implements window functions to organize data chronologically for each aircraft, identifying the next flight in the sequence.
- **Airport Traffic Dataset Creation**: Aggregates data into 15-minute intervals to compute inbound and outbound flight counts, offering a comprehensive overview of airport traffic.
- **Structured Output Presentation**: Displays results methodically, showing both the sequence of flights for each aircraft and the detailed airport traffic data.

#### Output Verification:
The output is carefully presented alongside the code to verify its accuracy and effectiveness. It includes:
- A sequence table for each aircraft, detailing flight ID, registration code, arrival time, and the subsequent flight ID.
- An exhaustive table illustrating the count of inbound and outbound flights in each 15-minute interval at selected airports.

This comprehensive code, along with its outputs, stands as proof of a functional solution, exemplifying the capability to analyze flight data proficiently using PySpark.

#### Considerations for Further Improvement:
While the script is robust and professional, there are areas for further enhancement:
- **Unit Testing**: Incorporating a framework like PyTest for unit testing each function would substantially increase code reliability.
- **Advanced Error Handling**: Implementing more sophisticated error handling and logging mechanisms for improved debugging and maintenance.
- **Configurability**: Adding a configuration file for parameters like file paths and database connections could enhance the script's flexibility and adaptability for different environments or datasets.
- **Performance Optimization**: Depending on dataset size, implementing strategies like broadcast joins or caching could optimize performance.
- **Documentation and Commenting**: Expanding the documentation and inline commenting to cover more complex logic and optimizations, facilitating better understanding for future maintenance or enhancements.

These improvements would elevate the script's professionalism, making it even more suitable for complex, real-world data engineering scenarios.

1. **Setting Up PySpark for Data Processing**

   This code segment provides instructions for installing and initializing PySpark, an essential component for efficient data processing. It sets the foundation for Spark-based data manipulation and analysis.



In [4]:
# Installing and Initializing PySpark
"""
This code segment provides instructions for installing and initializing PySpark, a powerful tool for distributed data processing. PySpark is a fundamental component for conducting data analysis and processing at scale.
"""

# Install PySpark using pip
!pip install pyspark

# Import the necessary libraries
from pyspark.sql import SparkSession

# Initialize a Spark session with a meaningful application name
spark = SparkSession.builder.appName("SPARKProcessing").getOrCreate()



2. **Flight Data Analysis and Airport Traffic Evaluation**

   This section of the code showcases the core functionality of tracking flight sequences and analyzing airport traffic using PySpark. It demonstrates how to process and analyze aviation data, making it a valuable tool for aviation and airport management.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, lead, date_format
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType

def initialize_spark_session(app_name):
    """Initialize and return a Spark session."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def read_csv_to_df(spark, file_path, timestamp_cols):
    """Read a CSV file into a Spark DataFrame and process timestamp columns."""
    try:
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        for col_name in timestamp_cols:
            df = df.withColumn(col_name, col(col_name).cast(TimestampType()))
        return df
    except Exception as e:
        print(f"Error reading the CSV file: {e}")
        return None

def build_flight_sequence(df):
    """Build and return a DataFrame with flight sequence information."""
    window_spec = Window.partitionBy("acft_regs_cde").orderBy("actl_arr_lcl_tms")
    return df.withColumn("next_flight_id", lead("id", 1).over(window_spec))

def analyze_airport_traffic(df):
    """Analyze and return airport traffic data."""
    try:
        inbound_traffic = df.groupBy("orig", window(col("actl_arr_lcl_tms"), "15 minutes")).count().withColumnRenamed("count", "in").withColumnRenamed("orig", "airport_code")
        outbound_traffic = df.groupBy("dest", window(col("actl_dep_lcl_tms"), "15 minutes")).count().withColumnRenamed("count", "out").withColumnRenamed("dest", "airport_code")
        airport_traffic = inbound_traffic.join(outbound_traffic, ["window", "airport_code"], "outer")
        return airport_traffic.select(col("airport_code"), date_format(col("window").start, "yyyy-MM-dd'T'HH:mm:ss").alias("tms"), "out", "in").na.fill(value=0, subset=["in", "out"])
    except Exception as e:
        print(f"Error analyzing airport traffic: {e}")
        return None

# Main execution block
if __name__ == "__main__":
    app_name = "FlightDataAnalysis"
    csv_file_path = '/content/Data Engineer exercise.csv'  # This can be parameterized as needed
    timestamp_cols = ['actl_dep_lcl_tms', 'actl_arr_lcl_tms', 'airborne_lcl_tms', 'landing_lcl_tms']

    # Initialize Spark session
    spark = initialize_spark_session(app_name)

    # Read data and prepare DataFrame
    spark_df = read_csv_to_df(spark, csv_file_path, timestamp_cols)
    if spark_df is not None:
        # Build a String of Flights
        flight_sequence_df = build_flight_sequence(spark_df)

        # Build a Dataset for Airport Traffic
        airport_traffic_df = analyze_airport_traffic(spark_df)

        if airport_traffic_df is not None:
            # Show results
            flight_sequence_df.show(5)
            airport_traffic_df.orderBy("airport_code", "tms").show(50)

    # Stopping the Spark session
    spark.stop()


+----+----+---+-------------------+-------------------+----------+-------+-------------+-------------------+-------------------+--------------+
|orig|dest| id|   actl_dep_lcl_tms|   actl_arr_lcl_tms|flight_num|flights|acft_regs_cde|   airborne_lcl_tms|    landing_lcl_tms|next_flight_id|
+----+----+---+-------------------+-------------------+----------+-------+-------------+-------------------+-------------------+--------------+
| YVR| YYZ| 21|2022-12-31 00:29:00|2022-12-31 08:27:00|       128|      1|          451|2022-12-31 01:01:00|2022-12-31 08:11:00|            10|
| YYZ| YVR| 10|2022-12-31 08:50:00|2022-12-31 10:38:00|       103|      1|          451|2022-12-31 09:13:00|2022-12-31 10:28:00|            16|
| YVR| YYZ| 16|2022-12-31 11:55:00|2022-12-31 19:10:00|       106|      1|          451|2022-12-31 12:10:00|2022-12-31 18:55:00|             2|
| YYZ| YVR|  2|2022-12-31 19:39:00|2022-12-31 21:22:00|       185|      1|          451|2022-12-31 20:05:00|2022-12-31 21:14:00|        