# Spark Group D Assigment
### An analysis on the U.S. states with relations to the time of departure and arrival to the respective airports (given the 'Flights Dataset', discussed in class).

### @Authors
 - Sandra Alemayehu
 - Frederico Andrade
 - Fernando Llopis
 - Amritesh Palani
 - Carmen Roldan
 - Bibake Uppal

## "INDEX"

## 1. Introduction to the our Dataset ("flights_jan08.csv")

According to a 2010 report made by the US Federal Aviation Administration, the economic price of domestic flight delays entails a yearly cost of 32.9 billion dollars to passengers, airlines and other parts of the economy. More than half of that amount comes from passengers' pockets, as they do not only waste time waiting for their planes to leave, but also miss connecting flights, spend money on food and have to sleep on hotel rooms while they're stranded.

The report, focusing on data from year 2007, estimated that air transportation delays put a 4 billion dollar dent in the country's gross domestic product that year. Full report can be found 
<a href="http://www.isr.umd.edu/NEXTOR/pubs/TDI_Report_Final_10_18_10_V3.pdf">here</a>.

But which are the causes for these delays?

In order to answer this question, we are going to analyze the provided dataset, containing up to 1.936.758 different internal flights in the US for 2008 and their causes for delay, diversion and cancellation; if any.

The data comes from the U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS)

This dataset is composed by the following variables:

1. **Year** 2008
2. **Month** 1
3. **DayofMonth** 1-31
4. **DayOfWeek** 1 (Monday) - 7 (Sunday)
5. **DepTime** actual departure time (local, hhmm)
6. **CRSDepTime** scheduled departure time (local, hhmm)
7. **ArrTime** actual arrival time (local, hhmm)
8. **CRSArrTime** scheduled arrival time (local, hhmm)
9. **UniqueCarrie**r unique carrier code
10. **FlightNum** flight number
11. **TailNum** plane tail number: aircraft registration, unique aircraft identifier
12. **ActualElapsedTime** in minutes
13. **CRSElapsedTime** in minutes
14. **AirTime** in minutes
15. **ArrDelay** arrival delay, in minutes: A flight is counted as "on time" if it operated less than 15 minutes later the scheduled time shown in the carriers' Computerized Reservations Systems (CRS).
16. **DepDelay** departure delay, in minutes
17. **Origin** origin IATA airport code
18. **Dest** destination IATA airport code
19. **Distance** in miles
20. **TaxiIn** taxi in time, in minutes
21. **TaxiOut** taxi out time in minutes
22. **Cancelled** *was the flight cancelled
23. **CancellationCode** reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
24. **Diverted** 1 = yes, 0 = no
25. **CarrierDelay** in minutes: Carrier delay is within the control of the air carrier. Examples of occurrences that may determine carrier delay are: aircraft cleaning, aircraft damage, awaiting the arrival of connecting passengers or crew, baggage, bird strike, cargo loading, catering, computer, outage-carrier equipment, crew legality (pilot or attendant rest), damage by hazardous goods, engineering inspection, fueling, handling disabled passengers, late crew, lavatory servicing, maintenance, oversales, potable water servicing, removal of unruly passenger, slow boarding or seating, stowing carry-on baggage, weight and balance delays.
26. **WeatherDelay** in minutes: Weather delay is caused by extreme or hazardous weather conditions that are forecasted or manifest themselves on point of departure, enroute, or on point of arrival.
27. **NASDelay** in minutes: Delay that is within the control of the National Airspace System (NAS) may include: non-extreme weather conditions, airport operations, heavy traffic volume, air traffic control, etc.
28. **SecurityDelay** in minutes: Security delay is caused by evacuation of a terminal or concourse, re-boarding of aircraft because of security breach, inoperative screening equipment and/or long lines in excess of 29 minutes at screening areas.
29. **LateAircraftDelay** in minutes: Arrival delay at an airport due to the late arrival of the same aircraft at a previous airport. The ripple effect of an earlier delay at downstream airports is referred to as delay propagation
30. **State** all 50 states of the United States.
31. **Population** integer: total population of each state.
32. **GDP per capita** integer: total gdp per capita for each state.
33. **Unemployment rate** float: percentage of unemployment accross each state.
34. **Poverty** integer: poverty rates for each state.
35. **Agriculture** 
36. **Services**
37. **Construction**
38. **Financial Services**
39. **Technology**
40. **Industry**
41. **Travelling and Tourism**

## 2. Pyskark Environment Setup

In [1]:
import findspark
findspark.init()

In [2]:
findspark.find()
import pyspark
findspark.find()

'/opt/spark-2.4.4-bin-hadoop2.7'

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName('appName').setMaster('local[4]')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

## 3. DataFrame Setup & Dataset Overview

Reading the Data:

In [72]:
import pyspark.sql.functions as F
import pandas as pd

### Flights

In [120]:
Flights08 = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("flights_jan08.csv")

In [122]:
airports = spark.read\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .csv("airports.csv")

airports_clean = airports.where((F.col("iso_country") == 'US') & (F.col("iata_code").isNotNull()))\
                   .withColumn('state', F.col('iso_region').substr(4,10))\
                   .select("iata_code", 'state')

In [136]:
flightsDF = Flights08.join(airports_clean, Flights08.Origin == airports_clean.iata_code, how = 'left')\
                     .withColumnRenamed("state", 'Origin_state')\
                     .drop("iata_code")\
                     .join(airports_clean, Flights08.Dest == airports_clean.iata_code, how = 'left')\
                     .withColumnRenamed("state", 'Dest_state')\
                     .drop("iata_code")

flightsDF.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Ca

### State Economics

In [137]:
states08 = spark.read\
                .option("header", "true")\
                .option("sep", ";")\
                .option("inferSchema", "true")\
                .csv("economic_data.csv")

In [138]:
# Source: https://data.world/uscensusbureau/fips-state-codes

state_codes = spark.createDataFrame(pd.read_csv('https://query.data.world/s/jwcnjqlganbi5nbnmn7pnqegyjzhfl'))

state_codes_clean = state_codes.select('STUSAB', 'STATE_NAME')\
                               .withColumnRenamed("STUSAB", 'STATE_CODE')

In [142]:
statesDF = states08.join(state_codes_clean, states08.State == state_codes_clean.STATE_NAME, how = 'left')\
                   .drop('STATE_NAME')

statesDF.printSchema()

root
 |-- State: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- GDP per capita: integer (nullable = true)
 |-- Unemployment Rate: double (nullable = true)
 |-- Poverty: integer (nullable = true)
 |-- Agriculture: integer (nullable = true)
 |-- Services: integer (nullable = true)
 |-- Construction: integer (nullable = true)
 |-- Financial activities: integer (nullable = true)
 |-- Technology: integer (nullable = true)
 |-- Industry: integer (nullable = true)
 |-- Travelling and turism: integer (nullable = true)
 |-- STATE_CODE: string (nullable = true)



In [140]:
print("The January Flights 2008 DataFrame has {} rows.".format(flightsDF.count()))
print("The United States 2008 DataFrame has {} rows.".format(statesDF.count()))

The January Flights 2008 DataFrame has 100000 rows.
The United States 2008 DataFrame has 51 rows.


## 4. Joining the two Datasets

In [None]:
FlightsStatesDF = Flights08.join(USStates08)

FlightsStatesDF.printSchema()
FlightsStatesDF.show(5)

display(Markdown("The joint flights DataFrame has **%d** rows." % FlightsStatesDF.count()))

## 5. Analysis (answering the proposed questions)

### A. Relationship between the economic prosperity of a city and the proportion of flight arrivals during the weekdays and weekends:

First of all, let's summarize our cities according to their economic prosperity.
This operation is based on the available economic indicators (already addressed in the description of our dataset).
Although we can rank our cities according to different and diverse indicators, we have chosen to represent our hierarchy according to the "GDP Per Capita":

In [None]:
TotalGDPDF = \
   FlightsStatesDF.groupBy("State")\
            .agg(count("GDP per capita").alias("TotalGDP"))

combinedDF.cache() # optimization to make the processing faster

display(Markdown("**States analysis on GDP Per Capita (in \%):"))
TotalGDPDF.limit(50).show()
TotalGDPDF\
   .groupBy("State")\
   .limit(50).show()

In [None]:
FlightsWeekDF = FlightsStatesDF.withColumn("DayOfWeek",when((col("DayOfWeek")>=1) & (col("DayOfWeek")<=5),"Weekdays")\
                               .when((col("DayOfWeek")>5) & (col("DayOfWeek")<=7),"Weekend"))

print("*Most 20: Flight By Destination: Week VS Weekend:*")

FlightsWeekDF.groupBy("Dest").pivot("DayOfWeek").count().orderBy(col("Weekdays").desc()).show()

print("*Least 20: Flight By Destination: Week VS Weekend:*")

CrimeWeekDF.groupBy("CRIME_TYPE").pivot("DAY_OF_WEEK").count().orderBy(col("Week").asc()).show()

In [None]:
print ("Most and least frequent occurrences for FlightNum, TailNum, Origin and Dest columns:")
FlightNumDF = flightsDF.groupBy("FlightNum").agg(count(lit(1)).alias("Total"))
TailNumDF   = flightsDF.groupBy("TailNum").agg(count(lit(1)).alias("Total"))
OriginDF    = flightsDF.groupBy("Origin").agg(count(lit(1)).alias("Total"))
DestDF      = flightsDF.groupBy("Dest").agg(count(lit(1)).alias("Total"))

leastFreqFlightNum    = FlightNumDF.orderBy(col("Total").asc()).first()
mostFreqFlightNum     = FlightNumDF.orderBy(col("Total").desc()).first()
leastFreqTailNum      = TailNumDF.orderBy(col("Total").asc()).first()
mostFreqTailNum       = TailNumDF.orderBy(col("Total").desc()).first()
leastFreqOrigin       = OriginDF.orderBy(col("Total").asc()).first()
mostFreqOrigin        = OriginDF.orderBy(col("Total").desc()).first()
leastFreqDest         = DestDF.orderBy(col("Total").asc()).first()
mostFreqDest          = DestDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqFlightNum", "mostFreqFlightNum", "leastFreqTailNum", "mostFreqTailNum", \
       "%d (%d occurrences)" % (leastFreqFlightNum["FlightNum"], leastFreqFlightNum["Total"]), \
       "%d (%d occurrences)" % (mostFreqFlightNum["FlightNum"], mostFreqFlightNum["Total"]), \
       "%s (%d occurrences)" % (leastFreqTailNum["TailNum"], leastFreqTailNum["Total"]), \
       "%s (%d occurrences)" % (mostFreqTailNum["TailNum"], mostFreqTailNum["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqOrigin", "mostFreqOrigin", "leastFreqDest", "mostFreqDest", \
       "%s (%d occurrences)" % (leastFreqOrigin["Origin"], leastFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (mostFreqOrigin["Origin"], mostFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (leastFreqDest["Dest"], leastFreqDest["Total"]), \
       "%s (%d occurrences)" % (mostFreqDest["Dest"], mostFreqDest["Total"]))))

### B. Relationship between the business nature of a city and the proportion of flights that arrive early in the morning (e.g. with people in a business travel) with respect to the number of flights arriving during the rest of the day:

### C. Proportion of flights arriving in the morning and the afternoon, when comparing to weekdays vs weekends in the aforementioned cities:

### D. Cities that receive most flights at weekends, and their connection to the vacation flow of customers:

### E. Do more developed cities suffer of smaller arrival delays on average?

### F. Relationship between the arrival time and the arrival delay - categorization of the day into discrete parts, for the arrival time (is it the same for all categories of cities?):

In [None]:
#CATEGORIZATION CODE#

totalFlights = flightsDF.count()
delayCategorizationDF = flightsDF\
   .where(col("ArrDelay")!="NA")\
   .withColumn("DelaySeverity", when(col("ArrDelay")<=0,"1.nodelay")\
                               .when((col("ArrDelay")>0) & (col("ArrDelay")<=15),"2.acceptable")\
                               .when((col("ArrDelay")>15) & (col("ArrDelay")<=30),"3.annoying")\
                               .when((col("ArrDelay")>30) & (col("ArrDelay")<=60),"4.impactul")\
                               .otherwise("5.unacceptable"))

#Arrival time VS Arrival Delay#

severeDelaysDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                       .withColumn("IntCarrierDelay", col("CarrierDelay").cast(IntegerType()))\
                       .withColumn("IntWeatherDelay", col("WeatherDelay").cast(IntegerType()))\
                       .withColumn("IntNASDelay", col("NASDelay").cast(IntegerType()))\
                       .withColumn("IntSecurityDelay", col("SecurityDelay").cast(IntegerType()))\
                       .withColumn("IntLateAircraftDelay", col("LateAircraftDelay").cast(IntegerType()))\
                       .select("DelaySeverity", "IntArrDelay","IntCarrierDelay","IntWeatherDelay",\
                               "IntNASDelay", "IntSecurityDelay", "IntLateAircraftDelay")
severeDelaysDF.cache() # optimization to make the processing faster

display(Markdown("**'Arrival' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntArrDelay").alias("AverageDelay"),\
                   min("IntArrDelay").alias("LowestDelay"),\
                   max("IntArrDelay").alias("HighestDelay"),\
                   stddev("IntArrDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Carrier' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntCarrierDelay").alias("AverageDelay"),\
                   min("IntCarrierDelay").alias("LowestDelay"),\
                   max("IntCarrierDelay").alias("HighestDelay"),\
                   stddev("IntCarrierDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Weather' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntWeatherDelay").alias("AverageDelay"),\
                   min("IntWeatherDelay").alias("LowestDelay"),\
                   max("IntWeatherDelay").alias("HighestDelay"),\
                   stddev("IntWeatherDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'NAS' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntNASDelay").alias("AverageDelay"),\
                   min("IntNASDelay").alias("LowestDelay"),\
                   max("IntNASDelay").alias("HighestDelay"),\
                   stddev("IntNASDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Security' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntSecurityDelay").alias("AverageDelay"),\
                   min("IntSecurityDelay").alias("LowestDelay"),\
                   max("IntSecurityDelay").alias("HighestDelay"),\
                   stddev("IntSecurityDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'LateAircraft' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntLateAircraftDelay").alias("AverageDelay"),\
                   min("IntLateAircraftDelay").alias("LowestDelay"),\
                   max("IntLateAircraftDelay").alias("HighestDelay"),\
                   stddev("IntLateAircraftDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

### G. What about the departure time?

In [None]:
##DEPARTURE TIME##

severeDelaysDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                       .withColumn("IntCarrierDelay", col("CarrierDelay").cast(IntegerType()))\
                       .withColumn("IntWeatherDelay", col("WeatherDelay").cast(IntegerType()))\
                       .withColumn("IntNASDelay", col("NASDelay").cast(IntegerType()))\
                       .withColumn("IntSecurityDelay", col("SecurityDelay").cast(IntegerType()))\
                       .withColumn("IntLateAircraftDelay", col("LateAircraftDelay").cast(IntegerType()))\
                       .select("DelaySeverity", "IntArrDelay","IntCarrierDelay","IntWeatherDelay",\
                               "IntNASDelay", "IntSecurityDelay", "IntLateAircraftDelay")
severeDelaysDF.cache() # optimization to make the processing faster

display(Markdown("**'Arrival' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntArrDelay").alias("AverageDelay"),\
                   min("IntArrDelay").alias("LowestDelay"),\
                   max("IntArrDelay").alias("HighestDelay"),\
                   stddev("IntArrDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Carrier' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntCarrierDelay").alias("AverageDelay"),\
                   min("IntCarrierDelay").alias("LowestDelay"),\
                   max("IntCarrierDelay").alias("HighestDelay"),\
                   stddev("IntCarrierDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Weather' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntWeatherDelay").alias("AverageDelay"),\
                   min("IntWeatherDelay").alias("LowestDelay"),\
                   max("IntWeatherDelay").alias("HighestDelay"),\
                   stddev("IntWeatherDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'NAS' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntNASDelay").alias("AverageDelay"),\
                   min("IntNASDelay").alias("LowestDelay"),\
                   max("IntNASDelay").alias("HighestDelay"),\
                   stddev("IntNASDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Security' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntSecurityDelay").alias("AverageDelay"),\
                   min("IntSecurityDelay").alias("LowestDelay"),\
                   max("IntSecurityDelay").alias("HighestDelay"),\
                   stddev("IntSecurityDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'LateAircraft' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntLateAircraftDelay").alias("AverageDelay"),\
                   min("IntLateAircraftDelay").alias("LowestDelay"),\
                   max("IntLateAircraftDelay").alias("HighestDelay"),\
                   stddev("IntLateAircraftDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()