<a href="https://colab.research.google.com/github/Alberto-San/Nguyen-test/blob/main/Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Setting up environment
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
!git clone https://github.com/Alberto-San/Nguyen-test

# Env Variables
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

# SparkContext
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()


Cloning into 'Nguyen-test'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (6/6), 6.58 KiB | 6.58 MiB/s, done.


# Question 1

Build a string of flights .i.e. next flights based on a turn of the aircraft

In [2]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Define the flight info function
def get_flight_info(
    actual_dep_local_time, actual_arr_local_time, flight_number, origin, destination
):
    return f"Flight number: {flight_number} to {destination}, Departure: {actual_dep_local_time}, Arrival: {actual_arr_local_time}"


# Register UDF for the flight info function
get_flight_info_udf = udf(get_flight_info, StringType())

# Read data from CSV with timestamp columns properly inferred
csv_path = "./Nguyen-test/source.csv"
df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(csv_path)
)

# Create a new DataFrame with flight info strings
df_flight_info = df.withColumn(
    "flight_info",
    get_flight_info_udf(
        col("actl_dep_lcl_tms"),
        col("actl_arr_lcl_tms"),
        col("flight_num"),
        col("orig"),
        col("dest"),
    ),
).select("flight_info")

# Order the DataFrame by departure timestamp
df_flight_info = df_flight_info.orderBy(col("actl_dep_lcl_tms"))

# Show the results
df_flight_info.show(100, truncate=False)


+---------------------------------------------------------------------------------------+
|flight_info                                                                            |
+---------------------------------------------------------------------------------------+
|Flight number: 128 to YYZ, Departure: 2022-12-31 00:29:00, Arrival: 2022-12-31 08:27:00|
|Flight number: 100 to YYZ, Departure: 2022-12-31 06:04:00, Arrival: 2022-12-31 13:24:00|
|Flight number: 104 to YYZ, Departure: 2022-12-31 08:07:00, Arrival: 2022-12-31 15:19:00|
|Flight number: 103 to YVR, Departure: 2022-12-31 08:50:00, Arrival: 2022-12-31 10:38:00|
|Flight number: 108 to YYZ, Departure: 2022-12-31 09:48:00, Arrival: 2022-12-31 16:58:00|
|Flight number: 110 to YYZ, Departure: 2022-12-31 10:08:00, Arrival: 2022-12-31 17:27:00|
|Flight number: 107 to YVR, Departure: 2022-12-31 10:18:00, Arrival: 2022-12-31 12:27:00|
|Flight number: 105 to YVR, Departure: 2022-12-31 11:23:00, Arrival: 2022-12-31 13:03:00|
|Flight nu

# Question 2

## Step 1: Find the closest quarter

We need to put a water mark regarding to the closest quarter minute to each of the time records. In this way, when we generates the sequence time data, we can perform a join to cross the information

In [3]:
from pyspark.sql.functions import col, expr, from_unixtime, unix_timestamp


def get_closest_quarter(df, input_column, output_column, quarter_minute_interval):
    # Transform timestamp to unix_timestamp
    df_flights_summary = df.withColumn(
        output_column,
        from_unixtime(unix_timestamp(input_column, "yyyy-MM-dd'T'HH:mm:ss")),
    )

    # Saturate to the closest quarter using unix timestamp representation
    df_flights_summary = df_flights_summary.withColumn(
        output_column,
        expr(
            f"from_unixtime(floor(unix_timestamp({output_column}) / {quarter_minute_interval}) * {quarter_minute_interval})"
        ),
    )

    # Convert to timestamp
    df_flights_summary = df_flights_summary.withColumn(
        output_column,
        expr(
            f"from_unixtime(unix_timestamp({output_column}) + {quarter_minute_interval})"
        ),
    )

    return df_flights_summary


# Define the quarter minute interval (in seconds)
quarter_minute_interval = 15 * 60  # 15 minutes * 60 seconds

# Select specific columns
df_flights_summary = df.select(
    "orig", "dest", "actl_dep_lcl_tms", "actl_arr_lcl_tms", "acft_regs_cde"
)

# Finding the closest quarter for actl_dep_lcl_tms and actl_arr_lcl_tms
df_flights_summary_format = get_closest_quarter(
    df_flights_summary,
    "actl_dep_lcl_tms",
    "actl_dep_lcl_tms_quarter",
    quarter_minute_interval,
)
df_flights_summary_format = get_closest_quarter(
    df_flights_summary_format,
    "actl_arr_lcl_tms",
    "actl_arr_lcl_tms_quarter",
    quarter_minute_interval,
)

df_flights_summary_format.show(100)


+----+----+-------------------+-------------------+-------------+------------------------+------------------------+
|orig|dest|   actl_dep_lcl_tms|   actl_arr_lcl_tms|acft_regs_cde|actl_dep_lcl_tms_quarter|actl_arr_lcl_tms_quarter|
+----+----+-------------------+-------------------+-------------+------------------------+------------------------+
| YYZ| YVR|2022-12-31 20:36:00|2022-12-31 22:28:00|          737|     2022-12-31 20:45:00|     2022-12-31 22:30:00|
| YYZ| YVR|2022-12-31 19:39:00|2022-12-31 21:22:00|          451|     2022-12-31 19:45:00|     2022-12-31 21:30:00|
| YYZ| YVR|2022-12-31 18:53:00|2022-12-31 20:33:00|          843|     2022-12-31 19:00:00|     2022-12-31 20:45:00|
| YYZ| YVR|2022-12-31 17:27:00|2022-12-31 19:00:00|          747|     2022-12-31 17:30:00|     2022-12-31 19:15:00|
| YYZ| YVR|2022-12-31 16:44:00|2022-12-31 18:31:00|          464|     2022-12-31 16:45:00|     2022-12-31 18:45:00|
| YYZ| YVR|2022-12-31 14:35:00|2022-12-31 16:43:00|          743|     20

## Step 2: Generates a sequence spaced by 15 minutes between the minimum and maximum dates.

Time intervals are created for each origin location, considering a 2-hour window before each 15-minute interval. This is because the question is asking
that for every 15-minute interval it must be analyzed data within a 2-hour time frame prior to that interval for each specific origin location and destiniyy location

In [4]:
from pyspark.sql.functions import (
    col,
    to_timestamp,
    to_date,
    min,
    max,
    explode,
    sequence,
)
from pyspark.sql import Window
import datetime

# Define the function to add days to a date
def add_days_to_date(dt: datetime.date, num_days: int) -> datetime.date:
    return dt + datetime.timedelta(days=num_days)


# Function to get a DataFrame with timestamp sequence for a specific airport code
def get_sequence(min_date: str, max_date: str, airport_code: str):
    query = f"""
    SELECT
        '{airport_code}' AS airport_code,
        sequence(to_timestamp('{min_date}'), to_timestamp('{max_date}'), interval 15 minutes) AS sequence_date
    """
    timestamp_df = spark.sql(query)
    return timestamp_df.withColumn("sequence_date", explode(col("sequence_date")))


# Function to get the DataFrame sequence for all airport codes
def get_dataframe_sequence(df, value_orig):
    # Filter
    df_flights_summary = df.filter(f"orig='{value_orig}'")

    # Convert the 'actl_dep_lcl_tms' from string to timestamp type
    df_flights_summary = df_flights_summary.withColumn(
        "actl_dep_lcl_tms", to_timestamp("actl_dep_lcl_tms", "yyyy-MM-dd'T'HH:mm:ss")
    )

    # Extract date from the timestamp
    df_flights_summary = df_flights_summary.withColumn(
        "actl_dep_lcl_date", to_date("actl_dep_lcl_tms")
    )

    # Finding the max and min date
    min_date = df_flights_summary.select(
        min("actl_dep_lcl_date").alias("min_date")
    ).collect()[0]["min_date"]
    min_date = add_days_to_date(min_date, 0)
    max_date = df_flights_summary.select(
        max("actl_dep_lcl_date").alias("max_date")
    ).collect()[0]["max_date"]
    max_date = add_days_to_date(max_date, 1)

    return get_sequence(min_date, max_date, value_orig)


# Getting orig
airport_codes_orig = df_flights_summary_format.select("orig").distinct()
airport_codes_list = [row["orig"] for row in airport_codes_orig.collect()]

# Getting sequence
df_seq = None
for airport_code in airport_codes_list:
    if df_seq is None:
        df_seq = get_dataframe_sequence(df_flights_summary_format, airport_code)
    else:
        df_seq = df_seq.union(
            get_dataframe_sequence(df_flights_summary_format, airport_code)
        )

df_seq = df_seq.orderBy("airport_code", "sequence_date")
df_seq.show(10)


+------------+-------------------+
|airport_code|      sequence_date|
+------------+-------------------+
|         YVR|2022-12-31 00:00:00|
|         YVR|2022-12-31 00:15:00|
|         YVR|2022-12-31 00:30:00|
|         YVR|2022-12-31 00:45:00|
|         YVR|2022-12-31 01:00:00|
|         YVR|2022-12-31 01:15:00|
|         YVR|2022-12-31 01:30:00|
|         YVR|2022-12-31 01:45:00|
|         YVR|2022-12-31 02:00:00|
|         YVR|2022-12-31 02:15:00|
+------------+-------------------+
only showing top 10 rows



## Step 3: Get flights out

For this item, we perform an analysis to get the columns:
* tms: this are the simulated sequence_date
* out: this columns representes for the current sequence_date, how many flights (how many distinct acft_regs_cde) are being lading from the current quarter to the last 2 hours (for this, we took into account the original actl_dep_lcl_tms values)
* in: this columns representes for the current sequence_date, how many flights (how many distinct acft_regs_cde) are being arriving from the current quarter to the last 2 hours (for this, we took into account the original actl_arr_lcl_tms values)

In [5]:
from pyspark.sql.functions import col, expr, when, count
from pyspark.sql import Window

flight_information = df_flights_summary_format

# Simulate outbound flights
out_window_spec = (
    Window.partitionBy("airport_code")
    .orderBy(col("actl_dep_lcl_tms").cast("long"))
    .rangeBetween(-2 * 60 * 60, 0)
)
df_out_flights = (
    df_seq.alias("seq")
    .join(
        flight_information.alias("info"),
        expr("sequence_date = actl_dep_lcl_tms_quarter and airport_code = orig"),
        "left",
    )
    .withColumn(
        "actl_dep_lcl_tms",
        when(col("actl_dep_lcl_tms").isNull(), col("sequence_date")).otherwise(
            col("actl_dep_lcl_tms")
        ),
    )
    .withColumn(
        "out",
        when(
            col("acft_regs_cde") > 0, count("acft_regs_cde").over(out_window_spec)
        ).otherwise(0),
    )
    .select("airport_code", "sequence_date", "out")
)

# Simulate inbound flights
in_window_spec = (
    Window.partitionBy("airport_code")
    .orderBy(col("actl_arr_lcl_tms").cast("long"))
    .rangeBetween(-2 * 60 * 60, 0)
)
df_in_flights = (
    df_seq.alias("seq")
    .join(
        flight_information.alias("info"),
        expr("sequence_date = actl_arr_lcl_tms_quarter and airport_code = dest"),
        "left",
    )
    .withColumn(
        "actl_arr_lcl_tms",
        when(col("actl_arr_lcl_tms").isNull(), col("sequence_date")).otherwise(
            col("actl_arr_lcl_tms")
        ),
    )
    .withColumn("in", count("acft_regs_cde").over(in_window_spec))
    .select("airport_code", "sequence_date", "in")
)

# Join outbound and inbound dataframes to get the flight summary
df_flights_summary = (
    df_out_flights.alias("out")
    .join(
        df_in_flights.alias("in"),
        expr(
            "in.airport_code = out.airport_code and in.sequence_date = out.sequence_date"
        ),
        "inner",
    )
    .orderBy("in.airport_code", "in.sequence_date")
)


df_flights_summary.show(100)


+------------+-------------------+---+------------+-------------------+---+
|airport_code|      sequence_date|out|airport_code|      sequence_date| in|
+------------+-------------------+---+------------+-------------------+---+
|         YVR|2022-12-31 00:00:00|  0|         YVR|2022-12-31 00:00:00|  0|
|         YVR|2022-12-31 00:15:00|  0|         YVR|2022-12-31 00:15:00|  0|
|         YVR|2022-12-31 00:30:00|  1|         YVR|2022-12-31 00:30:00|  0|
|         YVR|2022-12-31 00:45:00|  0|         YVR|2022-12-31 00:45:00|  0|
|         YVR|2022-12-31 01:00:00|  0|         YVR|2022-12-31 01:00:00|  0|
|         YVR|2022-12-31 01:15:00|  0|         YVR|2022-12-31 01:15:00|  0|
|         YVR|2022-12-31 01:30:00|  0|         YVR|2022-12-31 01:30:00|  0|
|         YVR|2022-12-31 01:45:00|  0|         YVR|2022-12-31 01:45:00|  0|
|         YVR|2022-12-31 02:00:00|  0|         YVR|2022-12-31 02:00:00|  0|
|         YVR|2022-12-31 02:15:00|  0|         YVR|2022-12-31 02:15:00|  0|
|         YV