In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder.appName("Airline Performance").getOrCreate()

In [3]:
#read records and make sql dataframe
flights = spark.read.option("header",True).csv('hdfs:///airline_performance/airline_2m.csv')
flights.createOrReplaceTempView("flights")

2022-04-12 15:50:16,191 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
#find airports as vertices in graph
airports = spark.sql("select Origin as id from flights union select Dest from flights")

In [5]:
#(origin airport, dest airport, airline) key stats as edges
stats = spark.sql("select Origin as src, Dest as dst, Reporting_Airline as airline, Year, Quarter, Month, DayofMonth, DayOfWeek, Origin, OriginStateName, OriginCityName, Dest, DestStateName, DestCityName, cast(DepDelay as int) as Departure_delay, cast(ArrDelay as int) as Arrival_Delay, DepTimeBlk, cast(Cancelled as int) as Cancelled, cast(CRSElapsedTime as int) as CRSElapsed_Time, cast(ActualElapsedTime as int) as Actual_Elapsed_Time, Distance from flights")

In [6]:
from graphframes import *

graph = GraphFrame(airports, stats)

In [None]:
# No. unique airports
# 1. 
graph.vertices.count
# 2.
airports.count()

### Trivial Analysis

In [None]:
# which Year?
yearly_delay = spark.sql("select Year, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year order by Arrival_Delay desc, Total_Cancelled desc")

# which month of each year?
monthly_delay = spark.sql("select Year, Month, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year, Month order by Arrival_Delay desc, Total_Cancelled desc")


# which day of month of each year?
daily_delay = spark.sql("select Year, Month, DayofMonth as Day, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year, Month, Day order by Arrival_Delay desc, Total_Cancelled desc")

# which day of week of each month of each year?
daily_delay = spark.sql("select Year, Month, DayOfWeek, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year, Month, DayOfWeek order by Arrival_Delay desc, Total_Cancelled desc")

hourly_delay = spark.sql("select DepTimeBlk, avg(ArrDelay) as Arrival_Delay from stats group by DepTimeBlk order by Arrival_Delay desc")

# airline ranking, worst to best?
airline_delay = spark.sql("select airline, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by airline order by Arrival_Delay desc, Total_Cancelled desc")

airline_yearly = spark.sql("select Year, airline, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year, airline ")

# worst airline of the year (delay)?
worst_airline_delay = spark.sql("select Year, airline, max(Arrival_Delay) as Arrival_Delay from airline_yearly group by Year")

# best airline of the year (delay)?
best_airline_delay = spark.sql("select Year, airline, min(Arrival_Delay) as Arrival_Delay from airline_yearly group by Year")

# worst airline of the year (cancelled)?
worst_airline_cancelled = spark.sql("select Year, airline, max(Total_Cancelled) as Total_Cancelled from airline_yearly group by Year")

# best airline of the year (cancelled)?
best_airline_cancelled = spark.sql("select Year, airline, min(Total_Cancelled) as Total_Cancelled from airline_yearly group by Year")

# airline performance year, month (delay, cancelled)
airline_performance = spark.sql("select Year, Month, airline, avg(ArrDelay) as Arrival_Delay, sum(Cancelled) as Total_Cancelled from stats group by Year, Month order by Arrival_Delay desc, Total_Cancelled desc")

# longest distance flights
distance_longest = spark.sql("select src, dst, max(Distance) as Distance from stats group by src, dst order by Distance")

# airports with longest delays
airports_delay = spark.sql("select src, dst, avg(Arrival_Delay) as Arrival_Delay from stats group by src, dst order by Arrival_Delay desc")

# top 10 most flights
most_flights = spark.sql("select src, dst, count(*) as NumberOfFlights from stats group by src, dst order by NumberOfFlights desc").take(10)

### Graph Analysis

In [None]:
# top 10 busiest airports
graph.degrees.orderBy(desc("degree")).take(10)

# bottom 10 busiest airports
graph.degrees.orderBy(asc("degree")).take(10)

# Page rank algorithm
result = graph.pageRank(resetProbability=0.15, tol=0.01)
result.vertices.orderBy(desc("pagerank")).take(10)

# find airports which don't have direct flight
subGraph = GraphFrame(graph.vertices, most_flights)
no_directs = subGraph.find("(src)-[]->(middle); (middle)-[]->(dst); !(src)-[]->(dst)").filter("src.id !=dst.id")

# Strongly Connected Components 
# find airports that are in a Component of graph
scc = graph.stronglyConnectedComponents(maxIter=10)

### Corellation
- Between Delay and Distance
- Between Delay and categorical columns
  - Month
  - DayofMonth
  - DayOfWeek
  - DepTimeBlk (Computer Reservation System (scheduled) time block)

In [None]:
# Function for making dummy variables for a column
def corr(source_column, target_column):
    categories = stats.select(target_column).distinct().rdd.flatMap(lambda x: x).collect()

    exprs = [F.when(F.col(target_column) == category, 1).otherwise(0).alias(category)
         for category in categories]

    corr_DF = stats.select(source_column, *exprs)

    corrs = []

    for category in categories:
        corrs.append(corr_DF.stat.corr(source_column, category))
    
    return corrs

In [11]:
stats.stat.corr("Distance", "Departure_delay")

stats.stat.corr("Distance", "Arrival_Delay")

In [None]:
# ANOVA (Analysis of Covariance) 
corr_list_month = corr("Departure_delay", "Month")
corr_list_day_month = corr("Departure_delay", "DayofMonth")
corr_list_day_week = corr("Departure_delay", "DayOfWeek")
corr_list_hour = corr("Departure_delay", "DepTimeBlk")

In [None]:
airports.coalesce(1).write.csv("hdfs:///airline_performance/dest")