# Task 3

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from sparkmeasure import StageMetrics
from pyspark.sql import functions as F
from operator import add
import operator
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from operator import add

spark = SparkSession.builder.appName("task1").getOrCreate()

stagemetrics = StageMetrics(spark)
dbfs_fileStore_prefix = "/FileStore/tables"
prefix = "ontimeperformance"

In [0]:
def clean_column_names(df):
  tempList = [] #Edit01
  for col in df.columns:
      new_name = col.strip()
      new_name = "".join(new_name.split())
      new_name = new_name.replace('.','') 
      tempList.append(new_name) 

  return df.toDF(*tempList) 

In [0]:
def print_models(airlines):
    if len(airlines) == 0: return
    i = 0
    curr = airlines[0][0]
    has_print = False
    aircraft_type = ''
    while i < len(airlines):
        if curr == airlines[i][0]:
            aircraft_type += airlines[i][1][0] + " " + airlines[i][1][1] + ", "
            i += 1
        else:
            has_print = True
            print(curr + " \t " +  "[" + aircraft_type[:-2] + "]")
            curr = airlines[i][0]
            aircraft_type = ""
    if not has_print: print(curr + " \t " + "[" + aircraft_type[:-2] + "]")
  
def task_3_rdd(spark_session, flights_path, airlines_path, aircrafts_path, country):
    flights_rdd = spark.sparkContext.textFile(flights_path).map(lambda x: x.split(",")) 
    airlines_rdd = spark.sparkContext.textFile(airlines_path).map(lambda x: x.split(",")) 
    aircrafts_rdd = spark.sparkContext.textFile(aircrafts_path).map(lambda x: x.split(",")) 
    
    # remove null
    flights_rdd = flights_rdd.filter(lambda x: x[0] != "")
    flights_rdd = flights_rdd.filter(lambda x: x[1] != "")
    flights_rdd = flights_rdd.filter(lambda x: x[2] != "")
    # remove null
    airlines_rdd = airlines_rdd.filter(lambda x: x[0] != "")
    airlines_rdd = airlines_rdd.filter(lambda x: x[1] != "")
    airlines_rdd = airlines_rdd.filter(lambda x: x[2] != "")
    
    # mapping (carrier_code, (name,country))
    airlines_rdd = airlines_rdd.filter(lambda x: x[2] == country)
    airlines_rdd = airlines_rdd.map(lambda x: (x[0], (x[1], x[2])))
    
    # removing header and rows with null
    header = aircrafts_rdd.first()
    aircrafts_rdd = aircrafts_rdd.filter(lambda x: len(x) == len(header))
    aircrafts_rdd = aircrafts_rdd.filter(lambda x: x != header)
    
    # tailnum, manufacturer, model
    aircrafts_rdd = aircrafts_rdd.map(lambda x: (x[0], x[2], x[4]))
    
    # re mapping (carrier_code , tailnum)
    flights_rdd = flights_rdd.map(lambda x: (x[0], x[5]))
    
    # carrier_code, (name, country) , tailnum
    airlines_count = airlines_rdd.join(flights_rdd)
    
    # map (tailnum, (name, carrier_code))
    airlines_count = airlines_count.map(lambda x: (x[1][1], (x[1][0][0], x[0])))
    
    # re-shape ((tailnum, name, carrier_code), 1)
    airlines_count = airlines_count.map(lambda x: ((x[0], x[1][1], x[1][0]), 1))
    airlines_count = airlines_count.reduceByKey(lambda x, y: x + y)
    
    # (tailnum, (name, count))
    airlines_count = airlines_count.map(lambda x: (x[0][0], (x[0][2], x[1])))
    
    # tailnum, (manufacture, model)
    aircrafts_rdd = aircrafts_rdd.map(lambda x: (x[0], (x[1], x[2])))  

    airlines_count = airlines_count.join(aircrafts_rdd)
    # (name, manufacture, model), count
    airlines_count = airlines_count.map(
        lambda x: ((x[1][0][0], x[1][1][0], x[1][1][1]), x[1][0][1])
    )
    # (name, manfufacturer, model), sum(count)
    airlines_count = airlines_count.reduceByKey(lambda x,y:x+y)    
    airlines_count = airlines_count.map(
        lambda x: (x[0][0], (x[0][1], x[0][2], x[0][2], x[1]))
    )
    
    # sorting by count then by airline name
    airlines_count = airlines_count.sortBy(lambda x: x[1][3], False).sortByKey()
    airlines = airlines_count.collect()
    
    # take top 5 by airline
    i = 0
    curr = airlines[i][0]
    aircraft = 0
    aircraft_rank = []
    
    while i < len(airlines):
        if curr == airlines[i][0]:
            # Top 5
            if aircraft < 5:
                aircraft_rank.append(airlines[i])
                i += 1
                aircraft += 1
            else: i += 1
        else:
            aircraft = 0
            curr = airlines[i][0]
    
    # output top 5 aircrafts per airline
    print_models(aircraft_rank)

In [0]:
stagemetrics.begin()

task_3_rdd(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_small.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")
stagemetrics.end()

In [0]:
stagemetrics.begin()

task_3_rdd(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_medium.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")
stagemetrics.end()

In [0]:
stagemetrics.begin()

task_3_rdd(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_large.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")
stagemetrics.end()

In [0]:
stagemetrics.begin()

task_3_rdd(spark, f"{dbfs_fileStore_prefix}/{prefix}_flights_massive.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_airlines.csv", 
                                f"{dbfs_fileStore_prefix}/{prefix}_aircrafts.csv", "United States")
stagemetrics.end()