## Reading the raw data files from Databricks FS.

In [0]:
# File locations
airline_file_location = "/FileStore/tables/phdata/airlines.csv"
airport_file_location = "/FileStore/tables/phdata/airports.csv"
flights_file_location = "/FileStore/tables/phdata/flights/partition_0*.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

#Creating Dataframes for all the raw csv files
# Read the airline.csv
airlines_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(airline_file_location).cache()
airlines_df.show(truncate=False)

# Read the airport.csv
airports_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(airport_file_location).cache()
airports_df.show(10, truncate=False)

# Read the flights.csv
flights_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(flights_file_location)
flights_df.show(10, truncate=False)

##Configure the Snowflake connector options and create Schema if needs

In [0]:
#Snowflake connection options
sf_options = {
  "sfUrl": "https://nv36420.ap-south-1.aws.snowflakecomputing.com/",
  "sfUser": "Mani",
  "sfPassword": "Password1!",
  "sfDatabase": "USER_MANI",
  "sfWarehouse": "INTERVIEW_WH"
}

In [0]:
%scala
import net.snowflake.spark.snowflake.Utils

val sf_options = Map(
  "sfUrl" -> "https://nv36420.ap-south-1.aws.snowflakecomputing.com/",
  "sfUser" -> "Mani",
  "sfPassword" -> "Password1!",
  "sfDatabase" -> "USER_MANI",
  "sfWarehouse" -> "INTERVIEW_WH"
)

// Create a Snowflake Schema
Utils.runQuery(sf_options, """CREATE SCHEMA IF NOT EXISTS RAW_DATA""")
Utils.runQuery(sf_options, """CREATE SCHEMA IF NOT EXISTS CURATED_DATA""")
Utils.runQuery(sf_options, """CREATE SCHEMA IF NOT EXISTS CURATED_DATA_NEW""")

## Writing raw data files into the Snowflake for better modularization

In [0]:
#Writing raw data files into the Snowfalke

airlines_df.write.mode('overwrite').format("snowflake").option("sfSchema", "RAW_DATA").option("dbtable", "USER_MANI.RAW_DATA.airlines_raw").options(**sf_options).save()
airports_df.write.mode('overwrite').format("snowflake").option("sfSchema", "RAW_DATA").option("dbtable", "USER_MANI.RAW_DATA.airports_raw").options(**sf_options).save()
flights_df.write.mode('overwrite').format("snowflake").option("sfSchema", "RAW_DATA").option("dbtable", "USER_MANI.RAW_DATA.flights_raw").options(**sf_options).save()

## Prepare the curated data set and store in Snowflake to generate the reports

In [0]:
#Writing the needed columns into the Snowflake table
cols = ["YEAR","MONTH","DAY","AIRLINE","FLIGHT_NUMBER","ORIGIN_AIRPORT","DESTINATION_AIRPORT","CANCELLED",     "CANCELLATION_REASON","AIR_SYSTEM_DELAY","SECURITY_DELAY","DEPARTURE_DELAY","ARRIVAL_DELAY",
        "AIRLINE_DELAY","LATE_AIRCRAFT_DELAY","WEATHER_DELAY"]

#On assumption basis, cleaning the data when there is null on ORIGIN_AIRPORT, DESTINATION_AIRPORT, AIRLINE
#Based on requirement, we can do more data scrubbing activity here
#Picking the desired columns to the table for computation
flights_clean_df = flights_df.dropna(how='any', subset=["ORIGIN_AIRPORT","DESTINATION_AIRPORT","AIRLINE"]).select(cols).cache()
flights_clean_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.flights").options(**sf_options).save()

## Report 1:
● Total number of flights by airline and airport on a monthly basis

In [0]:
from pyspark.sql import Window
from pyspark.sql import functions as f

window_dest_flights = Window.partitionBy("DESTINATION_AIRPORT", "YEAR", "MONTH", "AIRLINE")
window_orig_flights = Window.partitionBy("ORIGIN_AIRPORT", "YEAR", "MONTH", "AIRLINE")

# Picking only the desired columns from the cleaned dataset
input_flights_df = flights_clean_df.select("ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "YEAR", "MONTH", "AIRLINE")

# Deriving the total flights by destination airport, airline and month
flights_dest_df = input_flights_df.withColumn("count_airlines", f.count("AIRLINE").over(window_dest_flights)).withColumn("row_no", f.row_number().over(window_dest_flights.orderBy(f.col("DESTINATION_AIRPORT").desc()))).filter(f.col("row_no") == 1).withColumnRenamed("DESTINATION_AIRPORT", "AIRPORT")

# Deriving the total flights by origin airport, airline and month
flights_orig_df = flights_df.withColumn("count_airlines", f.count("AIRLINE").over(window_orig_flights)).withColumn("row_no", f.row_number().over(window_orig_flights.orderBy(f.col("ORIGIN_AIRPORT").desc()))).filter(f.col("row_no") == 1).withColumnRenamed("ORIGIN_AIRPORT", "AIRPORT")

# Deriving the total flights by airport, airline and month
final_df = flights_orig_df.join(flights_dest_df, ['AIRPORT','YEAR','MONTH','AIRLINE'], 'inner').withColumn("Total_flights", flights_orig_df["count_airlines"]+flights_dest_df["count_airlines"]).select(flights_orig_df["Airport"], flights_orig_df["YEAR"], flights_orig_df["MONTH"], flights_orig_df["AIRLINE"], "Total_flights").orderBy(f.asc("AIRPORT"), f.asc("YEAR"), f.asc("MONTH"), f.asc("AIRLINE"))

#Write the final derived data back to Snowflake database
final_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT1").options(**sf_options).save()

In [0]:
view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.TOTAL_FLIGHTS_PER_MONTH_BASIS").options(**sf_options).load()
display(view_df)

## Report 2:
On-time percentage of each airline for the year 2015

In [0]:
from pyspark.sql import Window
from pyspark.sql import functions as f

# Picking only the desired columns from the cleaned dataset
input_flights_df = flights_clean_df.select("AIRLINE", "DEPARTURE_DELAY", "YEAR", "ARRIVAL_DELAY")

# Deriving the airlines on time departure percentage for year 2015
flights_dept_df = input_flights_df.filter(input_flights_df["YEAR"] == 2015).filter(input_flights_df["DEPARTURE_DELAY"].isNotNull()).withColumn("DEPARTURE_DELAY_NEW", f.when(f.col("DEPARTURE_DELAY")  == 0, 1).otherwise(0)).groupBy(input_flights_df["AIRLINE"]).agg(f.count("DEPARTURE_DELAY").alias("TOTAL_COUNT"), f.sum("DEPARTURE_DELAY_NEW").alias("ON_TIME_COUNT")).withColumn("ON_TIME_PERCENTAGE", (f.col("ON_TIME_COUNT")/f.col("TOTAL_COUNT"))*100)

# Deriving the airlines on time arrival percentage for year 2015
flights_arr_df = input_flights_df.filter(input_flights_df["YEAR"] == 2015).filter(input_flights_df["ARRIVAL_DELAY"].isNotNull()).withColumn("ARRIVAL_DELAY_NEW", f.when(f.col("ARRIVAL_DELAY")  == 0, 1).otherwise(0)).groupBy(input_flights_df["AIRLINE"]).agg(f.count("ARRIVAL_DELAY").alias("TOTAL_COUNT"), f.sum("ARRIVAL_DELAY_NEW").alias("ON_TIME_COUNT")).withColumn("ON_TIME_PERCENTAGE", (f.col("ON_TIME_COUNT")/f.col("TOTAL_COUNT"))*100)

#Derving the on time percentage for each airlines for the year 2015
final_df = flights_arr_df.join(flights_dept_df, "AIRLINE", 'inner').withColumn("ON_TIME_PERCENTAGE_NEW", flights_arr_df["ON_TIME_PERCENTAGE"] + flights_dept_df["ON_TIME_PERCENTAGE"]).join(airlines_df, flights_arr_df["AIRLINE"] == airlines_df["IATA_CODE"], 'inner').select(airlines_df["AIRLINE"], "ON_TIME_PERCENTAGE_NEW").orderBy(f.col("ON_TIME_PERCENTAGE_NEW").desc())

#Write the final derived data back to Snowflake database
final_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT2").options(**sf_options).save()


In [0]:
on_time_airlines_view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.ON_TIME_AIRLINES").options(**sf_options).load()
display(on_time_airlines_view_df)

## Report 3:
● Airlines with largest number of delays

In [0]:
from pyspark.sql import functions as f

# Picking only the desired columns from the cleaned dataset
input_flights_df = flights_clean_df.select("AIRLINE", "DEPARTURE_DELAY", "ARRIVAL_DELAY")

# Deriving the airlines with largest delay at origin airport
flights_orig_df = input_flights_df.filter(input_flights_df["DEPARTURE_DELAY"] != 0).filter(input_flights_df["DEPARTURE_DELAY"].isNotNull()).groupBy(input_flights_df["AIRLINE"]).agg({"DEPARTURE_DELAY": "count"})

# Deriving the airlines with largest delay at destination airport
flights_dest_df = input_flights_df.filter(input_flights_df["ARRIVAL_DELAY"] != 0).filter(input_flights_df["ARRIVAL_DELAY"].isNotNull()).groupBy(input_flights_df["AIRLINE"]).agg({"ARRIVAL_DELAY": "count"})

#Derving the largest delay for each airlines
final_df = flights_orig_df.join(flights_dest_df, "AIRLINE", 'inner').withColumn("Total_Delay", flights_orig_df["count(DEPARTURE_DELAY)"] + flights_dest_df["count(ARRIVAL_DELAY)"]).join(airlines_df, flights_orig_df["AIRLINE"] == airlines_df["IATA_CODE"], 'inner').orderBy(f.col("Total_Delay").desc()).select(airlines_df["AIRLINE"], "Total_Delay")

#Write the final derived data back to Snowflake database
final_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT3").options(**sf_options).save()

In [0]:
largest_delay_airlines_view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.AIRLINES_WITH_LARGEST_DELAYS").options(**sf_options).load()
display(largest_delay_airlines_view_df)

## Report 4:
● Cancellation reasons by airport

In [0]:
from pyspark.sql import functions as f

# Picking only the desired columns from the cleaned dataset
input_flights_df = flights_clean_df.filter(f.col("CANCELLED") == 1).select("DESTINATION_AIRPORT", "ORIGIN_AIRPORT", "CANCELLATION_REASON", "CANCELLED")

# Deriving the destination airport with their cancellation reasons count
flights_dest_df = input_flights_df.groupBy("DESTINATION_AIRPORT", "CANCELLATION_REASON").agg(f.count("CANCELLATION_REASON").alias("count")).withColumnRenamed("DESTINATION_AIRPORT", "AIRPORT")

# Deriving the origin airport with their cancellation reasons count
flights_orig_df = input_flights_df.groupBy("ORIGIN_AIRPORT", "CANCELLATION_REASON").agg(f.count("CANCELLATION_REASON").alias("count")).withColumnRenamed("ORIGIN_AIRPORT", "AIRPORT")

#Deriving the cancellation reasons for each airport
final_df = flights_orig_df.join(flights_dest_df, ["AIRPORT","CANCELLATION_REASON"], 'inner').withColumn("Total_Count", flights_orig_df["count"] + flights_dest_df["count"]).orderBy(flights_orig_df["AIRPORT"].asc(), flights_orig_df["CANCELLATION_REASON"].asc()).select(flights_orig_df["AIRPORT"], flights_orig_df["CANCELLATION_REASON"], "Total_Count")

#Write the final derived data back to Snowflake database
final_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT4").options(**sf_options).save()

In [0]:
cancel_reasons_view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.CANCELLATION_REASONS_BY_AIRPORT").options(**sf_options).load()
display(cancel_reasons_view_df)

## Report 5:
● Delay reasons by airport

In [0]:
from pyspark.sql import functions as f

# Picking only the desired columns from the cleaned dataset
input_flights_df = flights_clean_df.select("DESTINATION_AIRPORT", "ORIGIN_AIRPORT", "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY", "AIRLINE_DELAY", "WEATHER_DELAY")

# Deriving the destination airport with their delay resons count
flights_dest_df = input_flights_df.withColumn("AIR_SYSTEM_DELAY_NEW", f.when(f.col("AIR_SYSTEM_DELAY") != 0, 1).otherwise(0)).withColumn("SECURITY_DELAY_NEW", f.when(f.col("SECURITY_DELAY") != 0, 1).otherwise(0)).withColumn("LATE_AIRCRAFT_DELAY_NEW", f.when(f.col("LATE_AIRCRAFT_DELAY") != 0, 1).otherwise(0)).withColumn("AIRLINE_DELAY_NEW", f.when(f.col("AIRLINE_DELAY") != 0, 1).otherwise(0)).withColumn("WEATHER_DELAY_NEW", f.when(f.col("WEATHER_DELAY") != 0, 1).otherwise(0)).groupBy("DESTINATION_AIRPORT").agg(f.sum("AIR_SYSTEM_DELAY_NEW").alias("AIR_SYSTEM_DELAY_NEW_count"), f.sum("SECURITY_DELAY_NEW").alias("SECURITY_DELAY_NEW_count"),f.sum("LATE_AIRCRAFT_DELAY_NEW").alias("LATE_AIRCRAFT_DELAY_NEW_count"),f.sum("AIRLINE_DELAY_NEW").alias("AIRLINE_DELAY_NEW_count"),f.sum("WEATHER_DELAY_NEW").alias("WEATHER_DELAY_NEW_count")).withColumnRenamed("DESTINATION_AIRPORT", "AIRPORT")

# Deriving the origin airport with their delay resons count
flights_orig_df = input_flights_df.withColumn("AIR_SYSTEM_DELAY_NEW", f.when(f.col("AIR_SYSTEM_DELAY") != 0, 1).otherwise(0)).withColumn("SECURITY_DELAY_NEW", f.when(f.col("SECURITY_DELAY") != 0, 1).otherwise(0)).withColumn("LATE_AIRCRAFT_DELAY_NEW", f.when(f.col("LATE_AIRCRAFT_DELAY") != 0, 1).otherwise(0)).withColumn("AIRLINE_DELAY_NEW", f.when(f.col("AIRLINE_DELAY") != 0, 1).otherwise(0)).withColumn("WEATHER_DELAY_NEW", f.when(f.col("WEATHER_DELAY") != 0, 1).otherwise(0)).groupBy("ORIGIN_AIRPORT").agg(f.sum("AIR_SYSTEM_DELAY_NEW").alias("AIR_SYSTEM_DELAY_NEW_count"), f.sum("SECURITY_DELAY_NEW").alias("SECURITY_DELAY_NEW_count"),f.sum("LATE_AIRCRAFT_DELAY_NEW").alias("LATE_AIRCRAFT_DELAY_NEW_count"),f.sum("AIRLINE_DELAY_NEW").alias("AIRLINE_DELAY_NEW_count"),f.sum("WEATHER_DELAY_NEW").alias("WEATHER_DELAY_NEW_count")).withColumnRenamed("ORIGIN_AIRPORT", "AIRPORT")

#Derving the each delay reasons for each airport
final_df = flights_orig_df.join(flights_dest_df, "AIRPORT", 'inner').withColumn("TOTAL_AIR_SYSTEM_DELAY_COUNT", flights_orig_df["AIR_SYSTEM_DELAY_NEW_count"] + flights_dest_df["AIR_SYSTEM_DELAY_NEW_count"]).withColumn("TOTAL_SECURITY_DELAY_COUNT", flights_orig_df["SECURITY_DELAY_NEW_count"] + flights_dest_df["SECURITY_DELAY_NEW_count"]).withColumn("TOTAL_LATE_AIRCRAFT_DELAY_COUNT", flights_orig_df["LATE_AIRCRAFT_DELAY_NEW_count"] + flights_dest_df["LATE_AIRCRAFT_DELAY_NEW_count"]).withColumn("TOTAL_AIRLINE_DELAY_COUNT", flights_orig_df["AIRLINE_DELAY_NEW_count"] + flights_dest_df["AIRLINE_DELAY_NEW_count"]).withColumn("TOTAL_WEATHER_DELAY_COUNT", flights_orig_df["WEATHER_DELAY_NEW_count"] + flights_dest_df["WEATHER_DELAY_NEW_count"]).orderBy(flights_orig_df["AIRPORT"].asc()).select(flights_orig_df["AIRPORT"], "TOTAL_AIR_SYSTEM_DELAY_COUNT", "TOTAL_SECURITY_DELAY_COUNT", "TOTAL_LATE_AIRCRAFT_DELAY_COUNT","TOTAL_AIRLINE_DELAY_COUNT","TOTAL_WEATHER_DELAY_COUNT")

#Write the final derived data back to Snowflake database
final_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT5").options(**sf_options).save()

In [0]:
delay_reasons_view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.DELAY_REASONS_BY_AIRPORT").options(**sf_options).load()
display(delay_reasons_view_df)

## Report 6:
● Airline with most unique routes

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType
from pyspark.sql import Window

window_airlines = Window.partitionBy("AIRLINE")
window_airlines_order = Window.partitionBy("AIRLINE").orderBy(f.asc("unique_route_count"))

# Derive the unique route count for each airlines
cols = ["AIRLINE", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT"]
unique_route_df = flights_df.select(cols).withColumn("airline_route", f.collect_set(f.array_sort(f.array(f.col("ORIGIN_AIRPORT"), f.col("DESTINATION_AIRPORT")))).over(window_airlines)).withColumn("unique_route_count", f.size("airline_route")).withColumn("row_no", f.row_number().over(window_airlines_order))
unique_route_df = unique_route_df.filter(f.col("row_no") == 1).drop("row_no", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT").join(airlines_df, unique_route_df["AIRLINE"] == airlines_df["IATA_CODE"], 'inner').select(airlines_df["AIRLINE"], "unique_route_count").orderBy(f.col("unique_route_count").desc())

#Write the final derived data back to Snowflake database
unique_route_df.write.mode('overwrite').format("snowflake").option("sfSchema", "CURATED_DATA_NEW").option("dbtable", "USER_MANI.CURATED_DATA_NEW.REPORT6").options(**sf_options).save()

In [0]:
airlines_unique_routes_view_df=spark.read.format("snowflake").option("dbtable", "USER_MANI.CURATED_DATA_NEW.UNIQUE_ROUTES_BY_AIRLINES").options(**sf_options).load()
display(airlines_unique_routes_view_df)