# AIRLINE ANALYSIS PROJECT

### Fetch the provided data for the analysis

In [0]:
# Assign the DBFS file path of Airlines , Airports and Flights files
airlines_path = "/FileStore/tables/airlines.csv"
airports_path=  "/FileStore/tables/airports.csv"
flights_path = "/FileStore/tables/flights/"

# Append all flights partition files to flights_list
flights_list=[]
files = dbutils.fs.ls(flights_path)
for file in files:
  if(file.name.startswith('partition')):
    flights_list.append(flights_path + file.name)    

### Assign schema and load the files into dataframe

In [0]:
from pyspark.sql.types import *

# Define Airlines Schema
airlines_schema = StructType([
  StructField("IATA_CODE", StringType()),
  StructField("AIRLINE", StringType())
])

# Define Airport Schema
airports_schema = StructType([
  StructField("IATA_CODE", StringType()),
  StructField("AIRPORT", StringType()),
  StructField("CITY", StringType()),
  StructField("STATE", StringType()),
  StructField("COUNTRY", StringType()),
  StructField("LATITUTE", FloatType()),
  StructField("LONGITUDE", FloatType())
])

# Define Flight Schema
flights_schema = StructType([
  StructField("YEAR", IntegerType()),
  StructField("MONTH", IntegerType()),
  StructField("DAY", IntegerType()),
  StructField("DAY_OF_WEEK", IntegerType()),
  StructField("AIRLINE", StringType()),
  StructField("FLIGHT_NUMBER", StringType()),
  StructField("TAIL_NUMBER", StringType()),
  StructField("ORIGIN_AIRPORT", StringType()),
  StructField("DESTINATION_AIRPORT", StringType()),
  StructField("SCHEDULED_DEPARTURE", StringType()),
  StructField("DEPARTURE_TIME", StringType()),
  StructField("DEPARTURE_DELAY", IntegerType()),
  StructField("TAXI_OUT", IntegerType()),
  StructField("WHEELS_OFF", StringType()),
  StructField("SCHEDULED_TIME", IntegerType()),
  StructField("ELAPSED_TIME", IntegerType()),
  StructField("AIR_TIME", IntegerType()),
  StructField("DISTANCE", IntegerType()),
  StructField("WHEELS_ON", IntegerType()),
  StructField("TAXI_IN", IntegerType()),
  StructField("SCHEDULED_ARRIVAL", IntegerType()),
  StructField("ARRIVAL_TIME", StringType()),
  StructField("ARRIVAL_DELAY", StringType()),
  StructField("DIVERTED", IntegerType()),
  StructField("CANCELLED", IntegerType()),
  StructField("CANCELLATION_REASON", StringType()),
  StructField("AIR_SYSTEM_DELAY", IntegerType()),
  StructField("SECURITY_DELAY", IntegerType()),
  StructField("AIRLINE_DELAY", IntegerType()),
  StructField("LATE_AIRCRAFT_DELAY", IntegerType()),
  StructField("WEATHER_DELAY", IntegerType())
])

# Load the Airline, Airport and Flight Dataframe with defined schema
airlinesDF = spark.read.format("csv").option("header","true").schema(airlines_schema).load(airlines_path)

airportsDF = spark.read.format("csv").option("header","true").schema(airports_schema).load(airports_path)

flightsDF = spark.read.format("csv").option("header","true").schema(flights_schema).load(path=flights_list)

### Configure Snowflake connectivity

In [0]:
# Reading the Snowflake username and password from Credentials.txt file in DBFS 
# Community Edition of Databricks does not permit use of secret manager
Credentials_path = dbutils.fs.head("/FileStore/tables/Credentials.txt")
creds = Credentials_path.splitlines()
username = creds[0]
password = creds[1]

# Snowflake Connectivity Details
options = {
  "sfUrl": "https://WO12131.west-us-2.azure.snowflakecomputing.com",
  "sfUser": username,
  "sfPassword": password,
  "sfDatabase": "USER_AKASH",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "INTERVIEW_WH"
}

### Write dataframes into Snowflake tables

In [0]:
airlinesDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "AIRLINES") \
  .save()

airportsDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "AIRPORTS") \
  .save()

flightsDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "FLIGHTS") \
  .save()


### Create temporary Databricks tables

In [0]:
airlinesDF.createOrReplaceTempView("airlines")
airportsDF.createOrReplaceTempView("airports")
flightsDF.createOrReplaceTempView("flights")

## Reports Section

### Total number of flights by airline and airport on a monthly basis

In [0]:
%sql
select
  a.AIRLINE,
  b.AIRPORT,
  f.MONTH,
  COUNT(*) AS NUM_FLIGHTS
FROM
  FLIGHTS f
  JOIN AIRLINES a ON f.AIRLINE = a.IATA_CODE
  JOIN AIRPORTS b ON f.ORIGIN_AIRPORT = b.IATA_CODE
GROUP BY
  a.AIRLINE,
  b.AIRPORT,
  f.MONTH
ORDER BY
  NUM_FLIGHTS DESC;

AIRLINE,AIRPORT,MONTH,NUM_FLIGHTS
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,8,21635
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,7,21591
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,3,20962
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,5,20891
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,6,20814
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,4,20235
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,1,18542
Delta Air Lines Inc.,Hartsfield-Jackson Atlanta International Airport,2,17443
American Airlines Inc.,Dallas/Fort Worth International Airport,7,13163
American Airlines Inc.,Dallas/Fort Worth International Airport,8,12862


In [0]:
total_flightsDF  = spark.sql(""" select
  a.AIRLINE,
  b.AIRPORT,
  f.MONTH,
  COUNT(*) AS NUM_FLIGHTS
FROM
  FLIGHTS f
  JOIN AIRLINES a ON f.AIRLINE = a.IATA_CODE
  JOIN AIRPORTS b ON f.ORIGIN_AIRPORT = b.IATA_CODE
GROUP BY
  a.AIRLINE,
  b.AIRPORT,
  f.MONTH
ORDER BY
  NUM_FLIGHTS DESC """)

total_flightsDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "TOTAL_FLIGHTS") \
  .save()

### On time percentage of each airline for the year 2015

In [0]:
## Any flight arriving with an ARRIVAL_DELAY of upto 14 min can be considered ON_TIME as all the delay values are NULL for ARRIVAL_DELAY <= 14
onTimeDF = spark.sql(""" select f.YEAR,a.AIRLINE,f.ARRIVAL_DELAY,
CASE
  when f.ARRIVAL_DELAY <= 14 THEN 1
  ELSE 0
  end as ON_TIME from flights  f JOIN AIRLINES a ON f.AIRLINE = a.IATA_CODE
  where  f.ARRIVAL_DELAY is not NULL  """)

# Register Dataframe as temp table
onTimeDF.registerTempTable("onTimeDF")

onTimePercentDF = spark.sql(""" select AIRLINE,(count(CASE WHEN ON_TIME = 1 THEN 1 END)/ (select count(*) from onTimeDF))*100 as ONTIME_PERCENT from onTimeDF where YEAR= 2015  group by AIRLINE order by ONTIME_PERCENT desc""")

onTimePercentDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "ONTIME_PERCENT") \
  .save()

In [0]:
%sql
select AIRLINE,(count(CASE WHEN ON_TIME = 1 THEN 1 END)/ (select count(*) from onTimeDF))*100 as ONTIME_PERCENT from onTimeDF where YEAR= 2015  group by AIRLINE order by ONTIME_PERCENT desc

AIRLINE,ONTIME_PERCENT
Southwest Airlines Co.,17.131647924505582
Delta Air Lines Inc.,12.86973600775419
American Airlines Inc.,8.630707189384744
Skywest Airlines Inc.,8.170334695286003
Atlantic Southeast Airlines,7.8481469251292655
United Air Lines Inc.,6.7019069856782565
US Airways Inc.,4.109418740632069
American Eagle Airlines Inc.,3.842119131286695
JetBlue Airways,3.489177310912762
Alaska Airlines Inc.,2.5908202287632207


### Airlines with largest number of delays

In [0]:
%sql
SELECT
  a.AIRLINE,
  COUNT(f.AIRLINE_DELAY) AS DELAY
FROM
  FLIGHTS f
  JOIN AIRLINES AS a ON f.AIRLINE = a.IATA_CODE
WHERE
  f.AIRLINE_DELAY IS NOT NULL and f.AIRLINE_DELAY != 0
GROUP By
  a.AIRLINE
ORDER BY
  DELAY DESC;

AIRLINE,DELAY
Southwest Airlines Co.,105418
United Air Lines Inc.,50181
Delta Air Lines Inc.,46351
American Airlines Inc.,43930
Atlantic Southeast Airlines,40317
Skywest Airlines Inc.,27487
JetBlue Airways,25763
American Eagle Airlines Inc.,22117
US Airways Inc.,19717
Spirit Air Lines,12821


In [0]:
airlineDelayDF = spark.sql (""" SELECT
  a.AIRLINE,
  COUNT(f.AIRLINE_DELAY) AS DELAY
FROM
  FLIGHTS f
  JOIN AIRLINES AS a ON f.AIRLINE = a.IATA_CODE
WHERE
  f.AIRLINE_DELAY IS NOT NULL and f.AIRLINE_DELAY != 0
GROUP By
  a.AIRLINE
ORDER BY
  DELAY DESC """)

airlineDelayDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "AIRLINE_DELAY") \
  .save()

### Cancellation reasons by airport

In [0]:
%sql
select distinct  air.AIRPORT, 
case 
    when fli.CANCELLATION_REASON = 'A' then 'Airline/Carrier'
    when fli.CANCELLATION_REASON = 'B' then 'Weather'
    when fli.CANCELLATION_REASON = 'C' then 'National Air System'
    when fli.CANCELLATION_REASON = 'D' then 'Security' 
    else 'Not Cancelled'
  end as CANCEL_REASON
from FLIGHTS fli JOIN AIRPORTS AS air ON fli.ORIGIN_AIRPORT = air.IATA_CODE
where fli.CANCELLATION_REASON IS NOT NULL;

AIRPORT,CANCEL_REASON
Shreveport Regional Airport,Weather
La Crosse Regional Airport,Weather
Savannah/Hilton Head International Airport,Weather
Meadows Field,Weather
Louis Armstrong New Orleans International Airport,Airline/Carrier
Hartsfield-Jackson Atlanta International Airport,National Air System
Bishop International Airport,Weather
University Park Airport,National Air System
Louisville International Airport (Standiford Field),National Air System
Ralph Wien Memorial Airport,Airline/Carrier


In [0]:
cancellationDF = spark.sql(""" select distinct  air.AIRPORT, 
case 
    when fli.CANCELLATION_REASON = 'A' then 'Airline/Carrier'
    when fli.CANCELLATION_REASON = 'B' then 'Weather'
    when fli.CANCELLATION_REASON = 'C' then 'National Air System'
    when fli.CANCELLATION_REASON = 'D' then 'Security' 
    else 'Not Cancelled'
  end as CANCEL_REASON
from FLIGHTS fli JOIN AIRPORTS AS air ON fli.ORIGIN_AIRPORT = air.IATA_CODE
where fli.CANCELLATION_REASON IS NOT NULL""")

cancellationDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "CANCELLATION_REASON") \
  .save()

### Delay reasons by airport

In [0]:
%sql
select distinct b.AIRPORT,
case
   when f.AIR_SYSTEM_DELAY > 0 THEN 'AIR_SYSTEM_DELAY'
   when f.SECURITY_DELAY > 0 THEN 'SECURITY_DELAY'
   when f.AIRLINE_DELAY >0 THEN 'AIRLINE_DELAY'
   when f.LATE_AIRCRAFT_DELAY >0 THEN 'LATE_AIRCRAFT_DELAY'
   when f.WEATHER_DELAY >0 THEN 'WEATHER_DELAY'
   else 'NO DELAY'
   end as DELAY_REASON 
  FROM FLIGHTS f JOIN AIRPORTS AS b ON f.ORIGIN_AIRPORT = b.IATA_CODE where ARRIVAL_DELAY > 14;

AIRPORT,DELAY_REASON
Easterwood Airport,AIR_SYSTEM_DELAY
Des Moines International Airport,LATE_AIRCRAFT_DELAY
Savannah/Hilton Head International Airport,WEATHER_DELAY
Ketchikan International Airport,AIRLINE_DELAY
Los Angeles International Airport,SECURITY_DELAY
San Antonio International Airport,AIRLINE_DELAY
Des Moines International Airport,AIR_SYSTEM_DELAY
Trenton Mercer Airport,AIR_SYSTEM_DELAY
Monroe Regional Airport,LATE_AIRCRAFT_DELAY
Fort Smith Regional Airport,LATE_AIRCRAFT_DELAY


In [0]:
delay_reasonDF = spark.sql("""select distinct b.AIRPORT,
case
   when f.AIR_SYSTEM_DELAY > 0 THEN 'AIR_SYSTEM_DELAY'
   when f.SECURITY_DELAY > 0 THEN 'SECURITY_DELAY'
   when f.AIRLINE_DELAY >0 THEN 'AIRLINE_DELAY'
   when f.LATE_AIRCRAFT_DELAY >0 THEN 'LATE_AIRCRAFT_DELAY'
   when f.WEATHER_DELAY >0 THEN 'WEATHER_DELAY'
   else 'NO DELAY'
   end as DELAY_REASON 
  FROM FLIGHTS f JOIN AIRPORTS AS b ON f.ORIGIN_AIRPORT = b.IATA_CODE where ARRIVAL_DELAY > 14 """)

delay_reasonDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "DELAY_REASON") \
  .save()


### Airlines with the most unique routes

In [0]:
%sql 
select
  distinct ur.AIRLINE,
  ur.UNIQUE_ROUTE
FROM
  (
    select
      distinct a.AIRLINE,
      f.ORIGIN_AIRPORT,
      f.DESTINATION_AIRPORT,
      COUNT(*) OVER(PARTITION BY a.AIRLINE) AS UNIQUE_ROUTE
    from
      FLIGHTS f
      JOIN AIRLINES AS a ON f.AIRLINE = a.IATA_CODE
  ) ur
order by
  ur.UNIQUE_ROUTE desc;

AIRLINE,UNIQUE_ROUTE
Southwest Airlines Co.,845173
Delta Air Lines Inc.,585399
American Airlines Inc.,424882
Skywest Airlines Inc.,397038
Atlantic Southeast Airlines,395816
United Air Lines Inc.,341961
American Eagle Airlines Inc.,210150
US Airways Inc.,198715
JetBlue Airways,178962
Alaska Airlines Inc.,115374


In [0]:
unique_routesDF= spark.sql(""" select distinct ur.AIRLINE, ur.UNIQUE_ROUTE FROM (select distinct a.AIRLINE, f.ORIGIN_AIRPORT, f.DESTINATION_AIRPORT , COUNT(*) OVER(PARTITION BY a.AIRLINE) AS UNIQUE_ROUTE from FLIGHTS f JOIN AIRLINES AS a ON f.AIRLINE = a.IATA_CODE ) ur order by  ur.UNIQUE_ROUTE desc """)

unique_routesDF.write \
  .format("snowflake") \
  .options(**options).mode("overwrite").options(header=True) \
  .option("dbtable", "UNIQUE_ROUTES") \
  .save()