# Question 1: Data Analysis with Spark

In [11]:
import csv
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.window import Window
from pyspark.sql.functions import *

#### Configure file path

In [3]:
# local file path configured for testing locally
aircraft_path = 'assignment_data_files/ontimeperformance_aircrafts.csv'
airline_path = 'assignment_data_files/ontimeperformance_airlines.csv'
airport_path = 'assignment_data_files/ontimeperformance_airports.csv'

## Alter flight_path to assess performace later for Qestion 2
flight_path = 'assignment_data_files/ontimeperformance_flights_tiny.csv'

#### Start SparkSession and load data to the form of Spark Dataframe

In [4]:
spark = SparkSession \
    .builder \
    .appName("training explore") \
    .getOrCreate()


aircraft_df = spark.read.csv(aircraft_path, header=True) \
                .select("tailnum", "manufacturer", "model") \
                .na.drop() \
                .cache()

airline_df = spark.read.csv(airline_path, header=True).cache()

flight_schema = StructType([
    StructField("flight_id", StringType(), True),
    StructField("carrier_code", StringType(), True),
    StructField("flight_number", StringType(), True),
    StructField("flight_date",DateType(),True),
    StructField("origin",StringType(),True),
    StructField("destination",StringType(),True),
    StructField("tail_number",StringType(),True),
    StructField("scheduled_depature_time",StringType(),True),
    StructField("scheduled_arrival_time",StringType(),True),
    StructField("actual_departure_time",StringType(),True),
    StructField("actual_arrival_time",StringType(),True),
    StructField("distance",IntegerType(),True)
    ])

flight_df = spark.read.csv(flight_path, header=True, schema= flight_schema) \
                    .select("carrier_code", "flight_date", "tail_number", 
                            "scheduled_depature_time", "actual_departure_time") \
                    .cache()

## Task 1: Top-3 Cessna Models

In [5]:
# Filter out CESSNA aircraft from cessna_aircraft_df
cessna_aircraft_df = aircraft_df.where("manufacturer == 'CESSNA'")

# Inner Join flight flight_cessna_df and cessna_aircraft_df
# Broadcast join is used for performace as cessna_aircraft_df is a very small set (12 rows in the proviced dataset)
flight_cessna_df = flight_df \
                    .join(broadcast(cessna_aircraft_df), 
                          cessna_aircraft_df.tailnum == flight_df.tail_number) \
                    .select("model")


window = Window() \
            .partitionBy("model")

# Alternatively: flight_cessna_df.groupBy("model").count().orderBy(desc("count")).take(3)
top3 = flight_cessna_df \
                .select("model", count("model").over(window).alias("count")) \
                .distinct() \
                .orderBy(desc("count")) \
                .take(3)


# write result to a local file
f = open("Cassna_top3.txt", "w")
for row in top3:
    f.write("Cassna "+ row['model'] + "\t" + str(row[1]) + "\n")

f.close()

## Task 2: Average Departure Delay

#### Transform columns in flight_df to get year, and scheduled_depature_time, actual_departure_time in minutes

In [6]:
## User Defined Function to get sum of minutes from start of the day to departure time
def get_minutes(dep_str, act_str):
    h,m,s = dep_str.split(":")
    h2,m2,s2 = act_str.split(":")
    return int(h2)*60 + int(m2) - int(h)*60 - int(m)

## TODO: find a way to actually get user_specified_year from user input
fake_user_specified_year = 1995
get_minutes_udf = udf(lambda x,y:get_minutes(x,y), IntegerType())

# c
flight_year_df = flight_df.select("carrier_code", "flight_date", "scheduled_depature_time", "actual_departure_time") \
                            .withColumn("year",year("flight_date")) \
                            .where("year == " + str(fake_user_specified_year)) \
                            .where("actual_departure_time is not null") \
                            .withColumn("lateness", get_minutes_udf("scheduled_depature_time", "actual_departure_time")) \
                            .drop("flight_date", "scheduled_depature_time", "actual_departure_time")

delayed_flight_df = flight_year_df.where("lateness > 0") 

In [7]:
# Filter out US airlines
airline_us_df = airline_df.where("country == 'United States'").drop("country")
# broadcast join flight and US airlines
airline_flight_df = delayed_flight_df.join(broadcast(airline_us_df), "carrier_code").cache()

delay_window = Window() \
                .partitionBy("name") \
## Get summary statistics
lateness_summary = airline_flight_df \
                    .select("name",
                            count("lateness").over(delay_window).alias("count"),
                            avg("lateness").over(delay_window).alias("average"),
                            min("lateness").over(delay_window).alias("minimum"),
                            max("lateness").over(delay_window).alias("maximum"),) \
                    .distinct() \
                    .orderBy("name") \
                    .collect()

# write result to local file
filename = "us_delay_flights_summary_"+ str(fake_user_specified_year)+'.txt'
f2 = open(filename, "w")
for row in lateness_summary:
    line = row['name']+'\t'+str(row['count'])+'\t'+ \
            str(row['average'])+'\t'+ str(row['minimum'])+'\t'+str(row['maximum'])+'\n'
    f2.write(line)
f2.close()

## Task 3: Most Popular Aircraft Types

In [8]:
## TODO: find a way to get from user input
user_defined_country = "'United States'"
airline_selected_df = airline_df.where("country ==" + user_defined_country).drop("country")
# join flight data with selected airlines and then join with aircrafts
flight_airline_aircraft_df = flight_df \
                                .join(broadcast(airline_selected_df), "carrier_code") \
                                .join(broadcast(aircraft_df), 
                                      flight_df.tail_number == aircraft_df.tailnum) \
                                .select("name","tailnum", "manufacturer", "model")

In [9]:
popular_aircraft_window = Window() \
                            .partitionBy("name", "manufacturer", "model" )

rank_window = Window() \
                .partitionBy("name") \
                .orderBy(desc("count"))
# count number of flights per partition of aircraft model
# rank the model based on counts of flights within an airline
# filter out models that has rank <= 5
# order by name and rank
# concatenate "manufacturer" and "model" columns for convenience later
popular_aircraft_result = flight_airline_aircraft_df \
                            .select("name", "manufacturer", "model",
                                    count("name").over(popular_aircraft_window).alias("count")) \
                            .distinct() \
                            .withColumn("rank", rank().over(rank_window)) \
                            .where("rank <=5") \
                            .orderBy("name", "rank") \
                            .select("name",concat("manufacturer", lit(' '),"model").alias("aircraft_type")) \
                            .collect()


In [10]:
# Deal with the situation where some airline has less than 5 aircraft model 
# while some may have ties in the top5

airline_list = []
aircraft_dict = {}
for row in popular_aircraft_result:
    if row["name"] not in airline_list:
        airline_list.append(row["name"])
    aircraft_dict.setdefault(row["name"],[]).append(row["aircraft_type"])

# Write result to a local file
filename = "popular_aircraft_in_"+user_defined_country+".txt"
f3 = open(filename, "w")
for row in airline_list:
    line = row+'\t'
    list_of_aircrafts = aircraft_dict[row]
    if(len(list_of_aircrafts)>5):
        line += str(list_of_aircrafts[:5]) + "\n"
    else:
        line += str(list_of_aircrafts) + "\n"
    f3.write(line)

f3.close()
