In [217]:
import pyspark
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import split, regexp_extract, to_date,when, col, lower,expr, hour, minute,concat,lit,to_timestamp,concat_ws, date_format,collect_list
from datetime import datetime
import re

In [218]:
spark = SparkSession.builder.appName('bovo').getOrCreate()
spark

In [219]:

schemaArr = StructType([
    StructField("PredictedArrival", StringType(), False),
    StructField("Flight_ID", StringType(), False),
    StructField("Destination", StringType(), False),
    StructField("Airline", StringType(), False),
    StructField("Aircraft_ID", StringType(), False),
    StructField("NUll", StringType(), False),
    StructField("TimeOfArrival", StringType(), False),
    StructField("Airport", StringType(), False),
    StructField("Date", StringType(), False),
    StructField("TimeDeff", StringType(), False)
    
])
schemaDep = StructType([
    StructField("PredictedDeparture", StringType(), False),
    StructField("Flight_ID", StringType(), False),
    StructField("Destination", StringType(), False),
    StructField("Airline", StringType(), False),
    StructField("Aircraft_ID", StringType(), False),
    StructField("NUll", StringType(), False),
    StructField("TimeOfDeparture", StringType(), False),
    StructField("Airport", StringType(), False),
    StructField("Date", StringType(), False),
    StructField("TimeDeff", StringType(), False)
    
    
])

schemaWea = StructType([
    
    StructField("Wind speed", StringType(), False),
    StructField("Temperature", StringType(), False),
    StructField("Date", StringType(), False),
    StructField("TimeInUTC", StringType(), False),
    StructField("Day", StringType(), False),
    StructField("Wind direction", StringType(), False),
    StructField("Pressure", StringType(), False),
    StructField("Airport", StringType(), False),
    StructField("index", StringType(), False),
    StructField("Dew point", StringType(), False),
    StructField("WindDirectionInDegrees", StringType(), False),
    StructField("VisibilityInSM", StringType(), False),
   
    
    
])

In [220]:
df_Dep=spark.read.option('header','false').csv(r'C:\Users\httyd\Desktop\capstone\airports\Data\2024-03-21\Departures*.csv',schema=schemaDep)
df_Arr=spark.read.option('header','false').csv(r'C:\Users\httyd\Desktop\capstone\airports\Data\2024-03-21\Arrivals*.csv',schema=schemaArr)
df_Wea=spark.read.option('header','false').csv(r'C:\Users\httyd\Desktop\capstone\airports\Data\2024-03-21\Weather*.csv',schema=schemaWea)

In [221]:
df_Arr=df_Arr.na.drop(how="any", thresh=5)

df_Arr = df_Arr.drop('NULL')

df_Arr = df_Arr.withColumn("Day", split(df_Arr["Date"], ",").getItem(0)) \
                       .withColumn("NumericalDate", split(df_Arr["Date"], ",").getItem(1))
df_Arr = df_Arr.drop("Date")
# Extract AirportName and AirportCode using regexp_extract
df_Arr = df_Arr.withColumn("AirportName", regexp_extract(df_Arr["Airport"], r'^(.*?)\(', 1)) \
                       .withColumn("Airport_Code", regexp_extract(df_Arr["Airport"], r'\((.*?)\)', 1))
df_Arr = df_Arr.drop("Airport")
# Extract AirportName and AirportCode using regexp_extract
df_Arr = df_Arr.withColumn("DestinationName", regexp_extract(df_Arr["Destination"], r'^(.*?)\(', 1)) \
                       .withColumn("Destination_Code", regexp_extract(df_Arr["Destination"], r'\((.*?)\)', 1))
df_Arr = df_Arr.drop("Destination")
# Extract AirportName and AirportCode using regexp_extract
df_Arr = df_Arr.withColumn("Aircraft_Type", regexp_extract(df_Arr["Aircraft_ID"], r'^(.*?)\(', 1)) \
                       .withColumn("Aircraft_Code", regexp_extract(df_Arr["Aircraft_ID"], r'\((.*?)\)', 1))
df_Arr = df_Arr.drop("Aircraft_ID")

# Split TimeOfDeparture based on whether it contains "Departed", "Estimated", or "Canceled"
df_Arr = df_Arr.withColumn("ArrivalStatus", 
                                   when(col("TimeOfArrival").contains("Landed"), "Landed")
                                   .when(col("TimeOfArrival").contains("Canceled"), "Canceled")
                                   .otherwise("Unknown"))

# Split TimeOfDeparture into two columns based on the DepartureStatus
df_Arr = df_Arr.withColumn("ActualArrivalTime", when(col("ArrivalStatus") == "Landed",
                                                               split(col("TimeOfArrival"), " ")[1])
                                   .otherwise(None))

df_Arr = df_Arr.drop("TimeOfArrival")

df_Arr = df_Arr.withColumn("Airline", split(df_Arr["Airline"], " -").getItem(0))



In [222]:
df_Arr = df_Arr.dropDuplicates()
row_count = df_Arr.count()
print("Number of rows in DataFrame Dep:", row_count)
# Assuming df_Arr is your DataFrame containing the sample data
df_Arr = df_Arr.withColumn("NumericalDate", to_date("NumericalDate", " MMM dd yyyy"))


Number of rows in DataFrame Dep: 180


In [223]:
df_Arr = df_Arr.withColumn("Flight_ID", lower(df_Arr["Flight_ID"]))
df_Arr = df_Arr.withColumn("Day", lower(df_Arr["Day"]))
df_Arr = df_Arr.withColumn("Airline", lower(df_Arr["Airline"]))
df_Arr = df_Arr.withColumn("AirportName", lower(df_Arr["AirportName"]))
df_Arr = df_Arr.withColumn("Airport_Code", lower(df_Arr["Airport_Code"]))
df_Arr = df_Arr.withColumn("DestinationName", lower(df_Arr["DestinationName"]))
df_Arr = df_Arr.withColumn("Destination_Code", lower(df_Arr["Destination_Code"]))
df_Arr = df_Arr.withColumn("Aircraft_Type", lower(df_Arr["Aircraft_Type"]))
df_Arr = df_Arr.withColumn("Aircraft_Code", lower(df_Arr["Aircraft_Code"]))
df_Arr = df_Arr.withColumn("ArrivalStatus", lower(df_Arr["ArrivalStatus"]))

In [224]:
df_Arr.show(100)

+----------------+---------+--------------------+--------+---------+-------------+--------------------+------------+--------------------+----------------+-------------+-------------+-------------+-----------------+
|PredictedArrival|Flight_ID|             Airline|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code|     DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|ArrivalStatus|ActualArrivalTime|
+----------------+---------+--------------------+--------+---------+-------------+--------------------+------------+--------------------+----------------+-------------+-------------+-------------+-----------------+
|           21:21|   eja767|             netjets| 4:00:00|wednesday|   2024-03-20|akron canton airport|    cak/kcak|          morganton |             mrn|        cl35 |       n767qs|       landed|            20:38|
|           13:18|   aa5218|      american eagle| 4:00:00|wednesday|   2024-03-20|akron canton airport|    cak/kcak|          charlotte |   

In [225]:
# # Assuming your DataFrame is named 'df'
# df_Arr = df_Arr.withColumn('PredictedArrival_Date', concat(col('PredictedArrival'), lit(' '), col('NumericalDate')))


In [226]:
df_Arr.printSchema()

root
 |-- PredictedArrival: string (nullable = true)
 |-- Flight_ID: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- TimeDeff: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- NumericalDate: date (nullable = true)
 |-- AirportName: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- DestinationName: string (nullable = true)
 |-- Destination_Code: string (nullable = true)
 |-- Aircraft_Type: string (nullable = true)
 |-- Aircraft_Code: string (nullable = true)
 |-- ArrivalStatus: string (nullable = false)
 |-- ActualArrivalTime: string (nullable = true)



In [227]:
# Calculate total seconds for TimeDeff and PredArrival
df_Arr = df_Arr.withColumn('TimeDeff_seconds', expr("hour(TimeDeff) * 3600 + minute(TimeDeff) * 60 + second(TimeDeff)"))
df_Arr = df_Arr.withColumn('PredArrival_seconds', expr("substring_index(PredictedArrival, ':', 1) * 3600 + substring_index(PredictedArrival, ':', -1) * 60"))

# Add TimeDeff to PredArrival (assuming PredArrival is in seconds since midnight)
df_Arr = df_Arr.withColumn('PredictedArri', col('TimeDeff_seconds') + col('PredArrival_seconds'))

# Calculate hours, minutes, and seconds
df_Arr = df_Arr.withColumn("hours", (col("PredictedArri") / 3600).cast("int"))
df_Arr = df_Arr.withColumn("remaining_seconds", col("PredictedArri") % 3600)
df_Arr = df_Arr.withColumn("minutes", (col("remaining_seconds") / 60).cast("int"))
df_Arr = df_Arr.withColumn("seconds", (col("remaining_seconds") % 60).cast("int"))

# Calculate extra days if hours exceed 24
df_Arr = df_Arr.withColumn("extraDay", (col("hours") / 24).cast("int"))

# Subtract extra days from hours
df_Arr = df_Arr.withColumn("hours", col("hours") % 24)

# Add extra day to the NumericalDate column
df_Arr = df_Arr.withColumn("NewNumericalDate", expr("date_add(to_date(NumericalDate, 'yy-MM-dd'), extraDay)"))

# Check if day exceeds month's maximum
df_Arr = df_Arr.withColumn("NewNumericalDate", 
                           expr("CASE WHEN day(NewNumericalDate) > day(last_day(NewNumericalDate)) \
                                 THEN add_months(NewNumericalDate, 1) \
                                 ELSE NewNumericalDate END"))

# Check if month exceeds 12
df_Arr = df_Arr.withColumn("NewNumericalDate", 
                           expr("CASE WHEN month(NewNumericalDate) > 12 \
                                 THEN add_months(NewNumericalDate, -12) \
                                 ELSE NewNumericalDate END"))

# Format the time
df_Arr = df_Arr.withColumn(
    "PredictedArrivalInUTC",
    expr("format_string('%02d:%02d:%02d', hours, minutes, seconds)")
)
df_Arr = df_Arr.withColumn('AccArrival_seconds', expr("substring_index(ActualArrivalTime, ':', 1) * 3600 + substring_index(ActualArrivalTime, ':', -1) * 60"))

# Add TimeDeff to PredArrival (assuming PredArrival is in seconds since midnight)
df_Arr = df_Arr.withColumn('AccdictedArri', col('TimeDeff_seconds') + col('AccArrival_seconds'))

# Calculate hours, minutes, and seconds
df_Arr = df_Arr.withColumn("hours", (col("AccdictedArri") / 3600).cast("int"))
df_Arr = df_Arr.withColumn("remaining_seconds", col("AccdictedArri") % 3600)
df_Arr = df_Arr.withColumn("minutes", (col("remaining_seconds") / 60).cast("int"))
df_Arr = df_Arr.withColumn("seconds", (col("remaining_seconds") % 60).cast("int"))
# Subtract extra days from hours
df_Arr = df_Arr.withColumn("hours", col("hours") % 24)
df_Arr = df_Arr.withColumn(
    "ActualArrivalInUTC",
    expr("format_string('%02d:%02d:%02d', hours, minutes, seconds)")
)
# Drop intermediate columns if not needed
df_Arr = df_Arr.drop("hours", "minutes", "seconds", "remaining_seconds", "extraDay","AccdictedArri","AccArrival_seconds","PredictedArri","PredArrival_seconds","TimeDeff_seconds")


In [228]:
df_Arr =df_Arr.withColumn("Pred_date",to_timestamp(concat_ws(" ", date_format(col("NewNumericalDate"), "yyyy-MM-dd"), col("PredictedArrivalInUTC")), "yyyy-MM-dd HH:mm:ss"))
df_Arr =df_Arr.withColumn("Act_date",to_timestamp(concat_ws(" ", date_format(col("NewNumericalDate"), "yyyy-MM-dd"), col("ActualArrivalInUTC")), "yyyy-MM-dd HH:mm:ss"))

In [229]:
# Add a new column 'rounded_hour' that represents the closest hour
df_Arr = df_Arr.withColumn("Newdate", to_date("Act_date")) \
    .withColumn("hour", hour("Act_date")) \
    .withColumn("minute", minute("Act_date")) \
    .withColumn("rounded_hour",
                when(col("minute") >= 30, expr("hour + 1")).otherwise(col("hour"))
                ) \
    .withColumn("rounded_hour", when(col("rounded_hour") == 24, 0).otherwise(col("rounded_hour"))) \
    .withColumn("rounded_hour", concat(
        when(col("rounded_hour") < 10, concat(lit("0"), col("rounded_hour"))).otherwise(col("rounded_hour")),
        lit(":00:00"))) \
    .drop("hour", "minute")




In [230]:
df_Arr.show(100)

+----------------+---------+--------------------+--------+---------+-------------+--------------------+------------+--------------------+----------------+-------------+-------------+-------------+-----------------+----------------+---------------------+------------------+-------------------+-------------------+----------+------------+
|PredictedArrival|Flight_ID|             Airline|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code|     DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|ArrivalStatus|ActualArrivalTime|NewNumericalDate|PredictedArrivalInUTC|ActualArrivalInUTC|          Pred_date|           Act_date|   Newdate|rounded_hour|
+----------------+---------+--------------------+--------+---------+-------------+--------------------+------------+--------------------+----------------+-------------+-------------+-------------+-----------------+----------------+---------------------+------------------+-------------------+-------------------+----------+---

**Dep**

In [231]:
df_Dep=df_Dep.na.drop(how="any", thresh=5)
df_Dep = df_Dep.drop('NULL')

In [232]:
df_Dep = df_Dep.dropDuplicates()
row_count = df_Dep.count()
print("Number of rows in DataFrame:", row_count)

Number of rows in DataFrame: 1982


In [233]:
df_Dep = df_Dep.withColumn("Day", split(df_Dep["Date"], ",").getItem(0)) \
                       .withColumn("NumericalDate", split(df_Dep["Date"], ",").getItem(1))
df_Dep = df_Dep.drop("Date")
# Extract AirportName and AirportCode using regexp_extract
df_Dep = df_Dep.withColumn("AirportName", regexp_extract(df_Dep["Airport"], r'^(.*?)\(', 1)) \
                       .withColumn("Airport_Code", regexp_extract(df_Dep["Airport"], r'\((.*?)\)', 1))
df_Dep = df_Dep.drop("Airport")
# Extract AirportName and AirportCode using regexp_extract
df_Dep = df_Dep.withColumn("DestinationName", regexp_extract(df_Dep["Destination"], r'^(.*?)\(', 1)) \
                       .withColumn("Destination_Code", regexp_extract(df_Dep["Destination"], r'\((.*?)\)', 1))
df_Dep = df_Dep.drop("Destination")
# Extract AirportName and AirportCode using regexp_extract
df_Dep = df_Dep.withColumn("Aircraft_Type", regexp_extract(df_Dep["Aircraft_ID"], r'^(.*?)\(', 1)) \
                       .withColumn("Aircraft_Code", regexp_extract(df_Dep["Aircraft_ID"], r'\((.*?)\)', 1))
df_Dep = df_Dep.drop("Aircraft_ID")

# Split TimeOfDeparture based on whether it contains "Departed", "Estimated", or "Canceled"
df_Dep = df_Dep.withColumn("DepartureStatus", 
                                   when(col("TimeOfDeparture").contains("Departed"), "Departed")
                                   .when(col("TimeOfDeparture").contains("Estimated"), "Estimated")
                                   .when(col("TimeOfDeparture").contains("Canceled"), "Canceled")
                                   .otherwise("Unknown"))

# Split TimeOfDeparture into two columns based on the DepartureStatus
df_Dep = df_Dep.withColumn("ActualDepartureTime", when(col("DepartureStatus") == "Departed",
                                                               split(col("TimeOfDeparture"), " ")[1])
                                   .otherwise(None))

df_Dep = df_Dep.withColumn("EstimatedDepartureTime", when(col("DepartureStatus") == "Estimated",
                                                                  split(col("TimeOfDeparture"), "  ")[1])
                                   .otherwise(None))

# Define the specific words you want to filter out
words_to_drop = ['unknown', 'canceled']

# Filter out rows containing the specific words in a particular column (e.g., 'columnName')
df_Dep = df_Dep.filter(~col('DepartureStatus').isin(words_to_drop))

df_Dep = df_Dep.withColumn("Airline", split(df_Dep["Airline"], " -").getItem(0))
# Assuming df_Dep is your DataFrame containing the sample data
df_Dep = df_Dep.withColumn("NumericalDate", to_date("NumericalDate", " MMM dd yyyy"))



In [234]:
df_Dep = df_Dep.withColumn("Flight_ID", lower(df_Dep["Flight_ID"]))
df_Dep = df_Dep.withColumn("Day", lower(df_Dep["Day"]))
df_Dep = df_Dep.withColumn("Airline", lower(df_Dep["Airline"]))
df_Dep = df_Dep.withColumn("AirportName", lower(df_Dep["AirportName"]))
df_Dep = df_Dep.withColumn("Airport_Code", lower(df_Dep["Airport_Code"]))
df_Dep = df_Dep.withColumn("DestinationName", lower(df_Dep["DestinationName"]))
df_Dep = df_Dep.withColumn("Destination_Code", lower(df_Dep["Destination_Code"]))
df_Dep = df_Dep.withColumn("Aircraft_Type", lower(df_Dep["Aircraft_Type"]))
df_Dep = df_Dep.withColumn("Aircraft_Code", lower(df_Dep["Aircraft_Code"]))
df_Dep = df_Dep.withColumn("DepartureStatus", lower(df_Dep["DepartureStatus"]))

In [235]:
# Assume you want to combine rows where the value in 'DepartureStatus' is 'Departed'
specific_value = 'Departed'

# Filter the DataFrame to include only the rows where the value in 'DepartureStatus' is 'Departed'
filtered_df = df_Dep.filter(df_Dep['DepartureStatus'] == specific_value)

# Collect the filtered rows into a list
combined_rows = filtered_df.collect()


In [236]:
df_Dep.show(100)

+------------------+---------+--------------------+--------------------+--------+---------+-------------+--------------------+------------+---------------+----------------+-------------+-------------+---------------+-------------------+----------------------+
|PredictedDeparture|Flight_ID|             Airline|     TimeOfDeparture|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code|DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|DepartureStatus|ActualDepartureTime|EstimatedDepartureTime|
+------------------+---------+--------------------+--------------------+--------+---------+-------------+--------------------+------------+---------------+----------------+-------------+-------------+---------------+-------------------+----------------------+
|             00:01|   5y8349|           atlas air|      Departed 01:46| 8:00:00| thursday|   2024-03-21|anchorage ted ste...|    anc/panc|   mexico city |             nlu|        b744 |       n473mc|       departed|    

In [237]:
from pyspark.sql.functions import expr, col, hour, minute, second, substring_index, date_add, last_day, add_months, when

# Calculate TimeDeff_seconds only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn('TimeDeff_seconds', 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(hour('TimeDeff') * 3600 + minute('TimeDeff') * 60 + second('TimeDeff')))

# Calculate ActualDepartureTime_seconds only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn('ActualDepartureTime_seconds', 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(substring_index('ActualDepartureTime', ':', 1).cast('int') * 3600 + 
                                      substring_index('ActualDepartureTime', ':', -1).cast('int') * 60))

# Add TimeDeff to PredArrival only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn('ActDeparture', 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(col('TimeDeff_seconds') + col('ActualDepartureTime_seconds')))

# Calculate hours, minutes, and seconds only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("hours", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise((col("ActDeparture") / 3600).cast("int")))
df_Dep = df_Dep.withColumn("remaining_seconds", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(col("Actdeparture") % 3600))
df_Dep = df_Dep.withColumn("minutes", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise((col("remaining_seconds") / 60).cast("int")))
df_Dep = df_Dep.withColumn("seconds", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise((col("remaining_seconds") % 60).cast("int")))

# Calculate extra days if hours exceed 24 only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("extraDay", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise((col("hours") / 24).cast("int")))

# Subtract extra days from hours only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("hours", when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(col("hours") % 24))

# Add extra day to the NumericalDate column only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("NewNumericalDate", 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(expr("date_add(to_date(NumericalDate, 'yy-MM-dd'), extraDay)")))

# Check if day exceeds month's maximum only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("NewNumericalDate", 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(expr("CASE WHEN day(NewNumericalDate) > day(last_day(NewNumericalDate)) \
                                             THEN add_months(NewNumericalDate, 1) \
                                             ELSE NewNumericalDate END")))

# Check if month exceeds 12 only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("NewNumericalDate", 
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(expr("CASE WHEN month(NewNumericalDate) > 12 \
                                             THEN add_months(NewNumericalDate, -12) \
                                             ELSE NewNumericalDate END")))

# Format the time only for non-null ActualDepartureTime
df_Dep = df_Dep.withColumn("ActualDepINUTC",
                           when(col('ActualDepartureTime').isNull(), None)
                           .otherwise(expr("format_string('%02d:%02d:%02d', hours, minutes, seconds)")))

# df_Dep = df_Dep.withColumn('PredDep_seconds', expr("substring_index(EstimatedDepartureTime, ':', 1) * 3600 + substring_index(EstimatedDepartureTime, ':', -1) * 60"))

# # Add TimeDeff to PredArrival (assuming PredArrival is in seconds since midnight)
# df_Dep = df_Dep.withColumn('PredDep', col('TimeDeff_seconds') + col('AccArrival_seconds'))

# # Calculate hours, minutes, and seconds
# df_Dep = df_Dep.withColumn("hours", (col("PredDep") / 3600).cast("int"))
# df_Dep = df_Dep.withColumn("remaining_seconds", col("PredDep") % 3600)
# df_Dep = df_Dep.withColumn("minutes", (col("remaining_seconds") / 60).cast("int"))
# df_Dep = df_Dep.withColumn("seconds", (col("remaining_seconds") % 60).cast("int"))
# # Subtract extra days from hours
# df_Dep = df_Dep.withColumn("hours", col("hours") % 24)
# df_Dep = df_Dep.withColumn(
#     "PredictedDepartureInUTC",
#     expr("format_string('%02d:%02d:%02d', hours, minutes, seconds)")
# )
# Drop intermediate columns if not needed
df_Dep = df_Dep.drop("hours", "minutes", "seconds", "remaining_seconds", "extraDay","ActDeparture","ActualDepartureTime_seconds","PredictedArri","PredArrival_seconds","TimeDeff_seconds")


In [238]:
# Calculate rounded_date only for non-null ActualDepINUTC
df_Dep = df_Dep.withColumn("rounded_date",
                           when(col('ActualDepINUTC').isNotNull(),
                                to_timestamp(concat_ws(" ", date_format(col("NewNumericalDate"), "yyyy-MM-dd"), col("ActualDepINUTC")), "yyyy-MM-dd HH:mm:ss"))
                          )

In [239]:
# Add a new column 'rounded_hour' that represents the closest hour
df_Dep = df_Dep.withColumn("Newdate", to_date("rounded_date")) \
    .withColumn("hour", hour("rounded_date")) \
    .withColumn("minute", minute("rounded_date")) \
    .withColumn("rounded_hour",
                when(col("minute") >= 30, expr("hour + 1")).otherwise(col("hour"))
                ) \
    .withColumn("rounded_hour", when(col("rounded_hour") == 24, 0).otherwise(col("rounded_hour"))) \
    .withColumn("rounded_hour", concat(
        when(col("rounded_hour") < 10, concat(lit("0"), col("rounded_hour"))).otherwise(col("rounded_hour")),
        lit(":00:00"))) \
    .drop("hour", "minute")


In [253]:
# Drop rows with null values in a specific column
df_Dep = df_Dep.na.drop(subset=["ActualDepartureTime"])

In [254]:
df_Dep.show(100)

+------------------+---------+--------------------+---------------+--------+---------+-------------+--------------------+------------+----------------+----------------+-------------+-------------+---------------+-------------------+----------------------+----------------+--------------+-------------------+----------+------------+
|PredictedDeparture|Flight_ID|             Airline|TimeOfDeparture|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code| DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|DepartureStatus|ActualDepartureTime|EstimatedDepartureTime|NewNumericalDate|ActualDepINUTC|       rounded_date|   Newdate|rounded_hour|
+------------------+---------+--------------------+---------------+--------+---------+-------------+--------------------+------------+----------------+----------------+-------------+-------------+---------------+-------------------+----------------------+----------------+--------------+-------------------+----------+------------+
|   

**weath**

In [242]:
df_Wea = df_Wea.dropDuplicates()
row_count = df_Wea.count()
print("Number of rows in DataFrame:", row_count)

Number of rows in DataFrame: 7471


In [243]:
df_Wea = df_Wea.drop("Day")
df_Wea = df_Wea.withColumn("Wind speed", split(df_Wea["Wind speed"], "k").getItem(0)) 
df_Wea = df_Wea.withColumn("Temperature", split(df_Wea["Temperature"], "°").getItem(0)) 
df_Wea = df_Wea.withColumn("Date", split(df_Wea["Date"], " ").getItem(0)) 
df_Wea = df_Wea.withColumn("TimeInUTC", split(df_Wea["TimeInUTC"], " ").getItem(0)) 

df_Wea = df_Wea.withColumn("Wind direction", split(df_Wea["Wind direction"], "°").getItem(0)) 
df_Wea = df_Wea.withColumn("Pressure", split(df_Wea["Pressure"], " ").getItem(0)) 
df_Wea = df_Wea.withColumn("AirportName", regexp_extract(df_Wea["Airport"], r'^(.*?)\(', 1)) \
                       .withColumn("Airport_Code", regexp_extract(df_Wea["Airport"], r'\((.*?)\)', 1))
df_Wea = df_Wea.drop("Airport")
df_Wea = df_Wea.withColumn("Dew point", split(df_Wea["Dew point"], "°").getItem(0)) 
df_Wea = df_Wea.withColumn("WindDirectionInDegrees", split(df_Wea["WindDirectionInDegrees"], "k").getItem(0)) 
df_Wea = df_Wea.withColumn("VisibilityInSM", split(df_Wea["VisibilityInSM"], " ").getItem(0)) 
df_Wea = df_Wea.withColumn("Day", split(df_Wea["Date"], "-")[2])


In [244]:
df_Wea = df_Wea.withColumn("AirportName", lower(df_Wea["AirportName"]))
df_Wea = df_Wea.withColumn("Airport_Code", lower(df_Wea["Airport_Code"]))


In [245]:
df_Wea =df_Wea.withColumn("rounded_date",to_timestamp(concat_ws(" ", date_format(col("Date"), "yyyy-MM-dd"), col("TimeInUTC")), "yyyy-MM-dd HH:mm"))

In [246]:
# Add a new column 'rounded_hour' that represents the closest hour
df_Wea = df_Wea.withColumn("Newdate", to_date("rounded_date")) \
    .withColumn("hour", hour("rounded_date")) \
    .withColumn("minute", minute("rounded_date")) \
    .withColumn("rounded_hour",
                when(col("minute") >= 30, expr("hour + 1")).otherwise(col("hour"))
                ) \
    .withColumn("rounded_hour", when(col("rounded_hour") == 24, 0).otherwise(col("rounded_hour"))) \
    .withColumn("rounded_hour", concat(
        when(col("rounded_hour") < 10, concat(lit("0"), col("rounded_hour"))).otherwise(col("rounded_hour")),
        lit(":00:00"))) \
    .drop("hour", "minute")

# Show the result
df_Wea.show()


+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|Wind speed|Temperature|      Date|TimeInUTC|Wind direction|Pressure|index|Dew point|WindDirectionInDegrees|VisibilityInSM|         AirportName|Airport_Code|Day|       rounded_date|   Newdate|rounded_hour|
+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|         9|          6|2024-03-18|    19:15|           350|   29.84|  N/A|      -11|                    18|            10|abingdon virginia...|    vji/kvji| 18|2024-03-18 19:15:00|2024-03-18|    19:00:00|
|        11|          1|2024-03-19|    14:10|           240|   30.01|  N/A|       -7|                    16|            10|athens gordon k. ...|    ato/kuni| 19|2024-03-19 14:1

In [247]:
df_Wea.show(100)

+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|Wind speed|Temperature|      Date|TimeInUTC|Wind direction|Pressure|index|Dew point|WindDirectionInDegrees|VisibilityInSM|         AirportName|Airport_Code|Day|       rounded_date|   Newdate|rounded_hour|
+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|         9|          6|2024-03-18|    19:15|           350|   29.84|  N/A|      -11|                    18|            10|abingdon virginia...|    vji/kvji| 18|2024-03-18 19:15:00|2024-03-18|    19:00:00|
|        11|          1|2024-03-19|    14:10|           240|   30.01|  N/A|       -7|                    16|            10|athens gordon k. ...|    ato/kuni| 19|2024-03-19 14:1

In [248]:
# Group by NewDate, rounded_hour, AirportName, and Airport_Code and compute statistics
grouped_df = df_Wea.groupBy("Newdate", "rounded_hour", "AirportName", "Airport_Code").agg(
    {"Wind speed": "avg",  # Compute average wind speed
     "Temperature": "avg",  # Compute average temperature
     "Wind direction": "avg",  # Compute average wind direction
     "Pressure": "avg",  # Compute average pressure
     "Dew point": "avg",  # Compute average dew point
     "VisibilityInSM": "avg"  # Compute average visibility
    }
)

# Show the result
grouped_df.show(100)

+----------+------------+--------------------+------------+-------------------+------------------+------------------+------------------+------------------+-------------------+
|   Newdate|rounded_hour|         AirportName|Airport_Code|avg(Wind direction)|   avg(Wind speed)|    avg(Dew point)|  avg(Temperature)|     avg(Pressure)|avg(VisibilityInSM)|
+----------+------------+--------------------+------------+-------------------+------------------+------------------+------------------+------------------+-------------------+
|2024-03-20|    21:00:00|auburn municipal ...|    aun/kaun|              260.0|               6.0|               0.0|              21.0|             30.05|               10.0|
|2024-03-19|    17:00:00|altoona blair cou...|    aoo/kaoo|              260.0|              14.0|              -8.0|               2.0|              29.8|               10.0|
|2024-03-18|    20:00:00|altoona blair cou...|    aoo/kaoo|              290.0|              16.0|              -8.0|   

In [268]:
# Join df_Arr and df_Wea based on Newdate and rounded_date
joined_df = df_Arr.join(df_Wea, (df_Arr.Newdate == df_Wea.Newdate) & (df_Arr.rounded_hour == df_Wea.rounded_hour)& (df_Arr.AirportName == df_Wea.AirportName), "inner")


In [250]:
num_rows_arr = df_Arr.count()
num_row_wea = grouped_df.count()

# Print the number of rows
print("Number of rows:", num_rows_arr)
# Print the number of rows
print("Number of rows:", num_row_wea)

Number of rows: 180
Number of rows: 5663


In [269]:
num_row_join = joined_df.count()

# Print the number of rows
print("Number of rows:", num_row_join)

Number of rows: 56


In [270]:
# Show the result
joined_df.show(100)

+----------------+---------+--------------------+--------+---------+-------------+--------------------+------------+--------------------+----------------+-------------+-------------+-------------+-----------------+----------------+---------------------+------------------+-------------------+-------------------+----------+------------+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|PredictedArrival|Flight_ID|             Airline|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code|     DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|ArrivalStatus|ActualArrivalTime|NewNumericalDate|PredictedArrivalInUTC|ActualArrivalInUTC|          Pred_date|           Act_date|   Newdate|rounded_hour|Wind speed|Temperature|      Date|TimeInUTC|Wind direction|Pressure|index|Dew point|WindDirectionInDegrees|VisibilityInSM| 

In [265]:
 # Join df_Arr and df_Wea based on Newdate and rounded_date
joined_df_dep = df_Dep.join(df_Wea, (df_Dep.Newdate == df_Wea.Newdate) & (df_Dep.rounded_hour == df_Wea.rounded_hour)& (df_Dep.AirportName == df_Wea.AirportName), "inner")


In [266]:
num_row_join = joined_df_dep.count()
num_row = df_Dep.count()

# Print the number of rows
print("Number of rows:", num_row_join)
print("Number of rows:", num_row)

Number of rows: 796
Number of rows: 813


In [267]:
joined_df_dep.show(100)

+------------------+---------+--------------------+---------------+--------+---------+-------------+--------------------+------------+----------------+----------------+-------------+-------------+---------------+-------------------+----------------------+----------------+--------------+-------------------+----------+------------+----------+-----------+----------+---------+--------------+--------+-----+---------+----------------------+--------------+--------------------+------------+---+-------------------+----------+------------+
|PredictedDeparture|Flight_ID|             Airline|TimeOfDeparture|TimeDeff|      Day|NumericalDate|         AirportName|Airport_Code| DestinationName|Destination_Code|Aircraft_Type|Aircraft_Code|DepartureStatus|ActualDepartureTime|EstimatedDepartureTime|NewNumericalDate|ActualDepINUTC|       rounded_date|   Newdate|rounded_hour|Wind speed|Temperature|      Date|TimeInUTC|Wind direction|Pressure|index|Dew point|WindDirectionInDegrees|VisibilityInSM|         Ai