# Task 3

In [0]:
dbfs_fileStore_prefix = "/FileStore/tables"
prefix = "ontimeperformance"
size = "small"

In [0]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext

import pyspark.sql.functions as f
from pyspark.sql.functions import col, avg, min, max, abs, concat_ws, rank
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
from pyspark.statcounter import StatCounter
from sparkmeasure import StageMetrics

In [0]:
def task_3(spark_session, flights_path, airlines_path, aircrafts_path, country):
  stagemetrics = StageMetrics(spark)
  stagemetrics.begin()
  
  ss=spark_session.builder
  clean_flight_Schema = StructType([StructField('carrier_code', StringType(), True),
                     StructField('flight_number', StringType(), True),
                     StructField('flight_date', StringType(), True),
                     StructField('origin', StringType(), True),
                     StructField('destination', StringType(), True),
                     StructField('tail_number', StringType(), True),
                     StructField('scheduled_departure_time', StringType(), True),
                     StructField('scheduled_arrival_time', StringType(), True),
                     StructField('actual_departure_time', StringType(), True),
                     StructField('actual_arrival_time', StringType(), True),
                     StructField('distance', StringType(), True)])
  clean_aircraft_Schema = StructType([StructField('tail_number', StringType(), True),
                     StructField('manufacture', StringType(), True),
                     StructField('model', StringType(), True),
                     StructField('year', StringType(), True),
                     StructField('aircraft_type', StringType(), True)])
  clean_airline_Schema = StructType([StructField('carrier_code', StringType(), True),
                     StructField('name', StringType(), True),
                     StructField('country', StringType(), True)])
  
  #read data
  DF_flight_schema = spark.read.format("csv").option("header", "true").schema(clean_flight_Schema).load(flights_path)[["carrier_code", "tail_number", "actual_departure_time"]].cache()
  DF_aircraft_schema = spark.read.format("csv").option("header", "true").schema(clean_aircraft_Schema).load(aircrafts_path)[["tail_number", "model", "aircraft_type"]].cache()
  DF_airline_schema = spark.read.format("csv").option("header", "true").schema(clean_airline_Schema).load(airlines_path).cache()
  
  #drop missing
  DF_Flights_clean=DF_flight_schema.na.drop()
  DF_Airlines_clean=DF_airline_schema.na.drop()
  DF_Aircraft_clean=DF_aircraft_schema.na.drop()
  
  #filter country
  country_name = country
  country_c_n = DF_Airlines_clean.select('carrier_code', 'name').filter(f.col("country") == country_name).dropDuplicates()
  
  #Join THREE datasets
  join_DF_country_airline = country_c_n.join(DF_Flights_clean, on=['carrier_code'], how='inner')
  join_DF_country_airline_aircraft = DF_Aircraft_clean.join(join_DF_country_airline, on=['tail_number'], how='inner')
  
  #combine manufacturer and aircraft_type as total_type
  join_DF_country_airline_aircraft = join_DF_country_airline_aircraft.withColumn("total_type" ,f.concat_ws(" ", f.col("model"), f.col("aircraft_type")))
  
  #calculate the count of each total_type and group by airline companies' name
  join_DF_country_airline_aircraft = join_DF_country_airline_aircraft.select('total_type', 'name').groupBy('name', 'total_type').agg(f.count(f.col("total_type")).alias("type_num"))
  
  #limit top5 aircraft for each airline company
  window = Window.partitionBy(join_DF_country_airline_aircraft['name']).orderBy(join_DF_country_airline_aircraft['type_num'].desc())
  join_DF_country_airline_aircraft = join_DF_country_airline_aircraft.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') <= 5)

  #convert it to list format
  size = 5
  df = join_DF_country_airline_aircraft.groupBy("name").pivot("rank", values=range(1, size+1)).agg(f.first("total_type")).fillna("")
  final_df = df.select("name", f.array([f.col(str(i)) for i in range(1, size+1)]).alias("total_type")).withColumn("total_type", f.array_remove("total_type", ""))
  
  #remove None

  def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'
  
  array_to_string_udf = udf(array_to_string, StringType())
  final_df = final_df.withColumn('total_type', array_to_string_udf(final_df["total_type"]))
  # final_df.write.csv('sp1111_task_3.csv')
  display(final_df)
  
  stagemetrics.end()
  stagemetrics.print_report()

In [0]:
task_3(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_{size}.csv",
       f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
       f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")

name,total_type
AirTran,"[BOEING 717-200,BOEING 737-7BD,BOEING 737-76N]"
Alaska Airlines Inc.,"[BOEING 737-4Q8,BOEING 737-490,BOEING 737-890,BOEING 737-790,BOEING 737-990]"
American Airlines Inc.,"[MCDONNELL DOUGLAS DC-9-82(MD-82),MCDONNELL DOUGLAS DC-9-83(MD-83),BOEING 757-223,AIRBUS INDUSTRIE A319-112,AIRBUS INDUSTRIE A319-132]"
American Eagle Airlines Inc.,"[EMBRAER EMB-145LR,EMBRAER EMB-135KL,EMBRAER EMB-135LR,BOMBARDIER INC CL-600-2C10,SAAB-SCANIA SAAB 340B]"
Atlantic Southeast Airlines,"[BOMBARDIER INC CL-600-2B19,EMBRAER EMB-145LR,BOMBARDIER INC CL-600-2C10,CANADAIR CL-600-2B19]"
Comair,"[BOMBARDIER INC CL-600-2B19,CANADAIR CL-600-2B19,BOMBARDIER INC CL-600-2C10,BOMBARDIER INC CL600-2D24,PIPER PA-28-180]"
Continental Air Lines Inc.,"[BOEING 737-524,BOEING 737-824,BOEING 737-3TO,BOEING 737-724,BOEING 757-224]"
Delta Air Lines Inc.,"[MCDONNELL DOUGLAS AIRCRAFT CO MD-88,BOEING 757-232,BOEING 737-832,BOEING 717-200,BOEING 767-332]"
Frontier Airlines Inc.,"[AIRBUS A319-111,AIRBUS A318-111,AIRBUS A319-112,AIRBUS INDUSTRIE A319-111,AIRBUS A320-214]"
Hawaiian Airlines Inc.,"[BOEING 717-200,BOEING 767-33A,BOEING 767-3CB,BOEING 767-332,BOEING 767-3G5]"


In [0]:
def task_3_RDD(spark_session, flights_path, airlines_path, aircrafts_path, country):
  stagemetrics = StageMetrics(spark)
  stagemetrics.begin()
  
  ss=spark_session.builder
  clean_flight_Schema = StructType([StructField('carrier_code', StringType(), True),
                     StructField('flight_number', StringType(), True),
                     StructField('flight_date', StringType(), True),
                     StructField('origin', StringType(), True),
                     StructField('destination', StringType(), True),
                     StructField('tail_number', StringType(), True),
                     StructField('scheduled_departure_time', StringType(), True),
                     StructField('scheduled_arrival_time', StringType(), True),
                     StructField('actual_departure_time', StringType(), True),
                     StructField('actual_arrival_time', StringType(), True),
                     StructField('distance', StringType(), True)])
  clean_aircraft_Schema = StructType([StructField('tail_number', StringType(), True),
                     StructField('manufacture', StringType(), True),
                     StructField('model', StringType(), True),
                     StructField('year', StringType(), True),
                     StructField('aircraft_type', StringType(), True)])
  clean_airline_Schema = StructType([StructField('carrier_code', StringType(), True),
                     StructField('name', StringType(), True),
                     StructField('country', StringType(), True)])
  
  #read data
  DF_flight = spark.read.format("csv").option("header", "true").schema(clean_flight_Schema).load(flights_path)[["carrier_code", "tail_number", "actual_departure_time"]].dropna()
  DF_aircraft = spark.read.format("csv").option("header", "true").schema(clean_aircraft_Schema).load(aircrafts_path)[["tail_number", "model", "aircraft_type"]].dropna()
  DF_airline = spark.read.format("csv").option("header", "true").schema(clean_airline_Schema).load(airlines_path).dropna()
  
  #create RDD
  rdd_Flight = DF_flight.rdd
  rdd_Aircraft = DF_aircraft.rdd
  rdd_Airlines = DF_airline.rdd
  
  
  #filter country
  c_airlines = rdd_Airlines.filter(lambda x: x[2] == country)
  
  #create key
  key_c_airlines = c_airlines.map(lambda x: (x[0], x[1:]))
  key_Flight = rdd_Flight.map(lambda x: (x[0], x[1:]))
  
  #join
  newRdd = key_c_airlines.join(key_Flight)
  
  #count number
  group_AL = newRdd.map(lambda x:(x[1][0][0], x[1][1][0]))
  group_AL = group_AL.map(lambda x:(x, 1))
  group_AL = group_AL.reduceByKey(lambda x,y: (x+y))
  
  #sort by numer
  group_AL = group_AL.map(lambda x:(x[0][0], (x[0][1], x[1])))
  group_AL = group_AL.sortBy(lambda x: x[1][1], ascending=False)
  
  #convert to required format (like "Boeing 787")
  tail_num = group_AL.map(lambda x: (x[1][0], x[0]))
  rdd_Aircraft = rdd_Aircraft.map(lambda x: (x[0], x[1] + ' ' + x[2]))
  
  #join 
  final_join_Rdd = tail_num.join(rdd_Aircraft)
  
  #count number for each aircraft_type group by airlines
  aircraft_number = final_join_Rdd.map(lambda x:(x[1], 1))
  aircraft_number = aircraft_number.reduceByKey(lambda x,y: (x+y)).groupByKey().map(lambda x : (x[0], list(x[1])[0])).sortByKey()
  
  #limit to top5
  final_Rdd = aircraft_number.sortBy(lambda x:(x[0][0],-x[1])).map(lambda x: (x[0][0], x[0][1])).groupByKey().map(lambda x : (x[0], list(x[1])[0:5])).sortByKey()

  print(final_Rdd.collect())
  
  stagemetrics.end()
  stagemetrics.print_report()

In [0]:
task_3_RDD(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_{size}.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")