# **Group 11 - Dataset Creation - Big Data and Cloud Computing (ADSP 31013)**

## Members:
- #### **Andrew Rafael James** 
- #### **Aravind Shreyas Ramesh** 
- #### **Reethesh Venkatraman** 
- #### **Rohit Kumar** 
- #### **Sahil Bharwani**

---

### **Flight Delay Prediction:**
*This project aims to identify drivers of delay of departures and arrival flights considering various factors including but not limited to origin, destination, distance, weather, taxi time, time, seasonality etc. and quantify the effect of these drivers.*

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, radians, sin, cos, sqrt, atan2, lit, round, to_date, to_timestamp, date_format
from pyspark.sql import Window
import pandas as pd
import numpy as np
from pyspark.sql.types import DoubleType
import math
from pyspark.sql import Window

import matplotlib.pyplot as plt
%matplotlib inline

In [37]:
#create Spark session
spark = SparkSession.builder.appName('NearestAirportFinder').config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()


#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#set log level
spark.sparkContext.setLogLevel("ERROR")

In [38]:
%%time
df_flight_2018 = spark.read.csv("gs://msca-bdp-student-gcs/Group11/Combined_Flights_2018.csv", header=True, inferSchema=True)
df_flight_2019 = spark.read.csv("gs://msca-bdp-student-gcs/Group11/Combined_Flights_2019.csv", header=True, inferSchema=True)
df_flight_2020 = spark.read.csv("gs://msca-bdp-student-gcs/Group11/Combined_Flights_2020.csv", header=True, inferSchema=True)
df_flight_2021 = spark.read.csv("gs://msca-bdp-student-gcs/Group11/Combined_Flights_2021.csv", header=True, inferSchema=True)

airports_df = spark.read.csv("gs://msca-bdp-student-gcs/Group11/airports_df.csv", header=True, inferSchema=True)
weather_df = spark.read.csv("gs://msca-bdp-student-gcs/Group11/dataset_weather.csv", header=True, inferSchema=True)



CPU times: user 102 ms, sys: 20.3 ms, total: 123 ms
Wall time: 1min 42s


                                                                                

## **Combining all years**

In [39]:
df_flight = df_flight_2018.union(df_flight_2019).union(df_flight_2020).union(df_flight_2021)

In [40]:
df_flight.count()

                                                                                

25115464

## **Filtering Weather Data Based on Analysis Time-Period**

In [41]:
%%time
weather_df = weather_df.withColumn("DATE", to_date(col("DATE"), "MM/dd/yyyy"))
weather_df = weather_df.filter((col("DATE").between("2018-01-01", "2021-12-31")))

CPU times: user 2.82 ms, sys: 541 µs, total: 3.36 ms
Wall time: 21.8 ms


In [42]:
#weather_df.count() #31,723,696 for 4 years; #8,160,221 for 2020

In [43]:
unique_origins = (
    df_flight.select("Origin")
    .union(df_flight.select("Dest"))
    .distinct()
)

filtered_airports_df = airports_df.join(unique_origins, airports_df["iata_code"] == unique_origins["Origin"], "inner")
print("Total Airports:" , len(filtered_airports_df.select("iata_code").distinct().rdd.flatMap(lambda x: x).collect()))




Total Airports: 387


                                                                                

In [44]:
__base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
__decodemap = { }
for i in range(len(__base32)):
    __decodemap[__base32[i]] = i
del i

def encode(latitude, longitude, precision=12):
    """
    Encode a position given in float arguments latitude, longitude to
    a geohash which will have the character count precision.
    """
    lat_interval, lon_interval = (-90.0, 90.0), (-180.0, 180.0)
    geohash = []
    bits = [ 16, 8, 4, 2, 1 ]
    bit = 0
    ch = 0
    even = True
    while len(geohash) < precision:
        if even:
            mid = (lon_interval[0] + lon_interval[1]) / 2
            if longitude > mid:
                ch |= bits[bit]
                lon_interval = (mid, lon_interval[1])
            else:
                lon_interval = (lon_interval[0], mid)
        else:
            mid = (lat_interval[0] + lat_interval[1]) / 2
            if latitude > mid:
                ch |= bits[bit]
                lat_interval = (mid, lat_interval[1])
            else:
                lat_interval = (lat_interval[0], mid)
        even = not even
        if bit < 4:
            bit += 1
        else:
            geohash += __base32[ch]
            bit = 0
            ch = 0
    return ''.join(geohash)

# Haversine distance function
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    delta_phi, delta_lambda = math.radians(lat2 - lat1), math.radians(lon2 - lon1)
    a = (math.sin(delta_phi / 2) ** 2 +
         math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Register haversine as a UDF
haversine_udf = F.udf(haversine, DoubleType())

# Define a function to generate geohash
def generate_geohash(lat, lon, precision=5):
#     return geohash.encode(lat, lon, precision)
    return encode(lat, lon, precision)

# Register the function as a UDF
geohash_udf = F.udf(generate_geohash)

# **Generate geohash for airports and weather data**

In [45]:
filtered_airports_df = filtered_airports_df.withColumn("geohash", geohash_udf(F.col("latitude_deg"), F.col("longitude_deg"), F.lit(4)))
weather_df = weather_df.withColumn("geohash", geohash_udf(F.col("latitude"), F.col("longitude"), F.lit(4)))

# **Broadcast airports_df and join on geohash to filter nearby locations**

In [46]:
airports_df_broadcasted = F.broadcast(filtered_airports_df)
combined_df = airports_df_broadcasted.join(weather_df, on="geohash", how="inner")

# **Calculate the distance between each airport and weather point in filtered set**

In [47]:
combined_df = combined_df.withColumn("Distance",
    haversine_udf(F.col("latitude_deg"), F.col("longitude_deg"), F.col("latitude"), F.col("longitude"))
)

In [48]:
#combined_df.count()

# **Define a window partitioned by iata_code, municipality, and Date, ordered by Distance and filter closest weather data point for each group**

In [49]:
window_spec = Window.partitionBy("iata_code", "municipality", "Date").orderBy("Distance")

combined_df_with_min_dist = combined_df.withColumn("row_num", F.row_number().over(window_spec))

closest_weather_df_2 = combined_df_with_min_dist.filter(F.col("row_num") == 1).select(
    "iata_code",
    "municipality",
    "Date",
    "TMAX",
    "TMIN",
    "PRCP",
    "Distance",
    "latitude_deg",   # airport latitude
    "longitude_deg",  # airport longitude
    "latitude",       # weather latitude
    "longitude"       # weather longitude
).withColumnRenamed("Distance", "hvs_distance") \
 .withColumnRenamed("latitude_deg", "airport_latitude") \
 .withColumnRenamed("longitude_deg", "airport_longitude") \
 .withColumnRenamed("latitude", "weather_latitude") \
 .withColumnRenamed("longitude", "weather_longitude")

In [50]:

final_combined_df = df_flight.join(closest_weather_df_2,
                                     (df_flight.Origin == closest_weather_df_2.iata_code) &
                                     (df_flight.FlightDate == closest_weather_df_2.Date),
                                     how="left")


In [51]:
columns_to_rename = {"Date": "Date_origin", 
                     "TMAX": "TMAX_origin", 
                     "TMIN": "TMIN_origin", 
                     "PRCP": "PRCP_origin", 
                     "hvs_distance": "hvs_distance_origin", 
                     "airport_latitude": "airport_latitude_origin", 
                     "airport_longitude": "airport_longitude_origin", 
                     "weather_latitude": "weather_latitude_origin", 
                     "weather_longitude": "weather_longitude_origin", 
                     "iata_code": "iata_code_origin", 
                     "municipality": "municipality_origin"}

for old_col, new_col in columns_to_rename.items():
    final_combined_df = final_combined_df.withColumnRenamed(old_col, new_col)


In [52]:
final_combined_df = final_combined_df.join(closest_weather_df_2,
                                     (df_flight.Dest == closest_weather_df_2.iata_code) &
                                     (df_flight.FlightDate == closest_weather_df_2.Date),
                                     how="left")

In [53]:
columns_to_rename = {"Date": "Date_dest", 
                     "TMAX": "TMAX_dest", 
                     "TMIN": "TMIN_dest", 
                     "PRCP": "PRCP_dest", 
                     "hvs_distance": "hvs_distance_dest", 
                     "airport_latitude": "airport_latitude_dest", 
                     "airport_longitude": "airport_longitude_dest", 
                     "weather_latitude": "weather_latitude_dest", 
                     "weather_longitude": "weather_longitude_dest", 
                     "iata_code": "iata_code_dest", 
                     "municipality": "municipality_dest"}

for old_col, new_col in columns_to_rename.items():
    final_combined_df = final_combined_df.withColumnRenamed(old_col, new_col)


In [54]:
#final_combined_df.count()

## **Save to specified path**

In [55]:
# Set GCS location where the Parquet file will be saved
#gcs_path = "gs://msca-bdp-student-gcs/Group11/flight_weather_joined_2018_2021_v2.parquet"

# Save the DataFrame as a Parquet file to GCS
#final_combined_df.write.mode("overwrite").parquet(gcs_path)