# <center>Exploring US Flights Data</center>

# Preliminaries

In [None]:
import findspark
findspark.init()
# findspark.init('C:/.../spark-2.4.5-bin-hadoop2.7')
findspark.find()    ## SPARK_HOME path

* Before Spark 2.0 we had to create a SparkConf and SparkContext to interact with Spark.
* Now we don't need to create SparkConf, SparkContext or SQLContext, as they’re encapsulated within the `SparkSession` function.
* Only one SparkSession may be active at once. If one is already active, we need to stop the existing one.

In [2]:
import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "16g").appName("Spark_1st").getOrCreate()

# spark = SparkSession.builder.getOrCreate()
# spark_session = SparkSession.builder.appName('APP').config(conf=SparkConf()).getOrCreate()

# spark.newSession()

## Explanation of arguments used in SparkSession
* `'local'`: The master component of Spark will run locally within our single JVM running this code.

* `'local[*]'`: Run Spark locally with as many worker threads as logical cores on our machine.

    * For more information check the following link: http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

* In order to avoid exceed the GC (Garbage Collection) overhead limit we could manage the Spark memory limits.

* Since we are running Spark in local mode, setting `spark.executor.memory` won't have any effect. The reason for this is that the Worker "lives" within the driver JVM process that starts when spark-shell starts and the default memory used for that is 512M. 

* We increased that by setting `spark.driver.memory` to 16 GigaBytes.


* Of course all these are optional and are not necessary to use as shown below: 

    `spark =  SparkSession.builder.appName("Spark_1st").getOrCreate()`

* Below we check is Spark is correctly [setup](http://www.future-perfect.co.uk/grammar-tip/is-it-setup-set-up-or-set-up/) and running.

In [3]:
df = spark.sql("select 'spark' as hello")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



# Obtaining the Data
* We will investigate the  [Airline On-Time Performance Data](https://www.transtats.bts.gov/Tables.asp?DB_ID=120&DB_Name=Airline%20On-Time%20Performance%20Data&DB_Short_Name=On-Time#).  We will use the 2017 data, which are available at https://auebgr-my.sharepoint.com/:u:/g/personal/louridas_aueb_gr/EbeTzde1LddFmqNWBFWWxNcBj5tDR_zvzDHvqqcmnndeEQ?e=Tb1orW.

  
* Before reading the file we upload it in the same directory in Anaconda3 where we created this Python folder we work on.


* Now we [*read*](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html) the data. We used the parameter 'inferSchema' in order to infer the input schema automatically from data. We will also used the first line as names of columns.

In [4]:
flight_data = spark.read.option("inferSchema", "true").option("header", "true").csv("flights.csv")

In [5]:
flight_data.limit(5).toPandas()

Unnamed: 0,FL_DATE,TAIL_NUM,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEST,DEST_CITY_NAME,DEP_TIME,DEP_DELAY,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c19
0,2018-01-01,N8891A,9E,GFK,"Grand Forks, ND",MSP,"Minneapolis, MN",1302,-8.0,1417,-12.0,0.0,,0.0,,,,,,
1,2018-01-01,N8891A,9E,MSP,"Minneapolis, MN",GFK,"Grand Forks, ND",1124,9.0,1234,-11.0,0.0,,0.0,,,,,,
2,2018-01-01,N293PQ,9E,DTW,"Detroit, MI",SAT,"San Antonio, TX",2022,4.0,2232,-50.0,0.0,,0.0,,,,,,
3,2018-01-01,N295PQ,9E,BOS,"Boston, MA",CVG,"Cincinnati, OH",1349,104.0,1613,94.0,0.0,,0.0,79.0,0.0,0.0,0.0,15.0,
4,2018-01-01,N605LR,9E,MSP,"Minneapolis, MN",CLT,"Charlotte, NC",846,1.0,1214,-14.0,0.0,,0.0,,,,,,


In [6]:
print(flight_data.limit(5).toPandas().shape)

(5, 20)


In [7]:
## Then read the number of lines of the RDD:
total_rows = flight_data.count()
total_rows

7213446

* We observe that our dataset has 7213446 rows and 20 columns.

In [8]:
# Display the first line of the RDD:
flight_data.first()

Row(FL_DATE=datetime.datetime(2018, 1, 1, 0, 0), TAIL_NUM='N8891A', CARRIER='9E', ORIGIN='GFK', ORIGIN_CITY_NAME='Grand Forks, ND', DEST='MSP', DEST_CITY_NAME='Minneapolis, MN', DEP_TIME=1302, DEP_DELAY=-8.0, ARR_TIME=1417, ARR_DELAY=-12.0, CANCELLED=0.0, CANCELLATION_CODE=None, DIVERTED=0.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, _c19=None)

* Below we see the data types inferred for each column.

In [9]:
# flight_data.dtypes
flight_data.printSchema()

root
 |-- FL_DATE: timestamp (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- CARRIER: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY: double (nullable = true)
 |-- NAS_DELAY: double (nullable = true)
 |-- SECURITY_DELAY: double (nullable = true)
 |-- LATE_AIRCRAFT_DELAY: double (nullable = true)
 |-- _c19: string (nullable = true)



* We observed that we have extra column named '_c19' where as we see below, all of its values are equal to null.
So we removed this column from the dataset.

In [10]:
flight_data.select('_c19').distinct().show()

+----+
|_c19|
+----+
|null|
+----+



In [11]:
flight_data = flight_data.drop('_c19')
print(flight_data.limit(5).toPandas().shape)

(5, 19)


## <ins>Comment for partitions</ins>

* Spark breaks the data into *partitions*.

* A partition is a collection of rows that sits on a single computer in your cluster.

In [12]:
flight_data.count()

7213446

In [13]:
print(flight_data.rdd.getNumPartitions())

8


* The data contain over 7,200,000 rows, which will split into 8 partitions. that means that approximately every partition will contain 900,000 rows. This means that each partition will contain a file of over 100MB. So, we understand that 8 partitions is a small number for our dataset and does not effectively use the all resources in the cluster.

* We could further repartition the dataframe.
`The repartition algorithm` does a full shuffle of the data and creates equal sized partitions of data. It can be used to either increase or decrease the number of partitions in a dataframe.

* For example, we have a dataset with over 7,200,000 rows and size of 850 MB.
    * <ins>with 1000 partitions</ins>: each file will have approximately 7,200 rows and size of 920 kB. 
    * <ins>with 4000 partitions</ins>: each file will have approximately 1,800 rows and size of 230 kB. 

* Repartition imporoves performance as it allows us to process data a lot faster.

In [14]:
partitioned_data = flight_data.repartition(1000)
print(partitioned_data.rdd.getNumPartitions())
# partitioned_data.write.mode("overwrite").csv("data/flights_part.csv", header=True)

1000


In [15]:
# partitioned_data.write.mode("overwrite").csv("data/flights_part.csv", header=True)
# flight_data21 = spark.read.option("inferSchema", "true").option("header", "true").csv("data/flights_part.csv")

In [16]:
print(partitioned_data.count())
# flight_data = partitioned_data

7213446


We will only keep the subset of columns that are of interest to us.

In [17]:
# temp1 = flight_data[['CARRIER','ORIGIN','ORIGIN_CITY_NAME','DEP_TIME','DEP_DELAY']]
temp1 = flight_data.select('CARRIER','ORIGIN','ORIGIN_CITY_NAME','DEP_TIME','DEP_DELAY','CANCELLED')
temp1.limit(5).toPandas()

Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0
2,9E,DTW,"Detroit, MI",2022,4.0,0.0
3,9E,BOS,"Boston, MA",1349,104.0,0.0
4,9E,MSP,"Minneapolis, MN",846,1.0,0.0


In [18]:
temp1.count()

7213446

* Before continuing with our analysis, we replace all the missing values in the data (NAs) with zero.

In [19]:
temp82 = temp1.na.fill(0)

# <center>`"Misery Index" for airports`</center>
## <ins>Delayed flight</ins>
* A flight delay is when an airline flight takes off and/or lands later than its scheduled time. 

* The Federal Aviation Administration (FAA) considers a flight to be delayed when it is <b>15 minutes later</b> than its scheduled time. This will be the criterion we will also use in our analysis.

* In order to indicate whether a flight was delayed or not based on the afforementioned criterion we will create a new column. As always in Spark we will save the result in a new Dataframe.

In [20]:
temp24 = temp82.withColumn("Delayed_flights",temp82['DEP_DELAY']>15.0)
temp24.limit(6).toPandas()

# temp3 = temp82.where(temp82['DEP_DELAY'] >=0)    # 2nd way: filter the rows

Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED,Delayed_flights
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0,False
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0,False
2,9E,DTW,"Detroit, MI",2022,4.0,0.0,False
3,9E,BOS,"Boston, MA",1349,104.0,0.0,True
4,9E,MSP,"Minneapolis, MN",846,1.0,0.0,False
5,9E,SYR,"Syracuse, NY",1820,5.0,0.0,False


* We can also encode the previously created column if we wanted.

In [21]:
from pyspark.sql.types import IntegerType
temp24_coded = temp24.withColumn("Delayed_flights_coded", temp24["Delayed_flights"].cast(IntegerType()))
temp24_coded.limit(6).toPandas()

Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED,Delayed_flights,Delayed_flights_coded
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0,False,0
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0,False,0
2,9E,DTW,"Detroit, MI",2022,4.0,0.0,False,0
3,9E,BOS,"Boston, MA",1349,104.0,0.0,True,1
4,9E,MSP,"Minneapolis, MN",846,1.0,0.0,False,0
5,9E,SYR,"Syracuse, NY",1820,5.0,0.0,False,0


## <ins>Cancelled flight</ins>

* We also observed that some flights that were eventually cancelled present a delay in their departure. This is not logical so we will exclude them from analysis.

* The results which justify our reasoning are presented below:

In [22]:
temp76 = temp82.where(temp82['CANCELLED'] ==1)
print('Number of flights that have been cancelled:',temp76.count())

Number of flights that have been cancelled: 116584


In [23]:
temp76 = temp82.where(temp82['CANCELLED'] ==1)
# temp76.select('DEP_DELAY').distinct().toPandas()
print(temp76.where(temp76['DEP_DELAY']!=0).count(),'flights have presented a delay in their departure')

3889 flights have presented a delay in their departure


* Keep only the data of the flights that have not been cancelled:

In [24]:
temp24_coded = temp24_coded.where(temp24_coded['CANCELLED'] ==0) 
print(temp24_coded.count())
temp24_coded.limit(3).toPandas()

7096862


Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED,Delayed_flights,Delayed_flights_coded
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0,False,0
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0,False,0
2,9E,DTW,"Detroit, MI",2022,4.0,0.0,False,0


* [*createOrReplaceTempView*](https://spark.apache.org/docs/2.2.0/api/R/createOrReplaceTempView.html) is used when we want to store a table for a particular spark session. \
It creates (or replaces if that view name already exists) a temporary view using a SparkDataFrame in the Spark Session. \
We can then use like a hive table in Spark SQL and run SQL query on top of that.

In [25]:
temp24_coded.createOrReplaceTempView("temp24_coded")

Your criterion for outliers will be the airports in the lowest 1% percentile in the number of flights.

* Outliers criterion: We will only keep the airports that have more than (7096862*0.01=)71,000 flights, which corresponds to the lowest 1% percentile in the number of flights.

* Process:
    * 1) Use an SQL query to find the total flights for each airport
    * 2) We will find the percentile in the number of flights
    * 3) Find which airports correspond to the lowest 1% percentile and exclude them

In [26]:
temp861 = spark.sql("""
    SELECT ORIGIN,count(ORIGIN) as total_flights
    FROM temp24_coded
    GROUP BY ORIGIN
    """)

print(temp861.count(),'rows')
temp861.limit(5).toPandas()

358 rows


Unnamed: 0,ORIGIN,total_flights
0,BGM,909
1,PSE,779
2,INL,639
3,DLG,81
4,MSY,55221


* Use the <b>percent_rank</b> [window function](https://spark.apache.org/docs/2.1.0/api/R/percent_rank.html), which returns the percentile of rows in the number of flights for each airport.

* Find the airports that correspond to the lowest 1% percentile.

In [27]:
from pyspark.sql.window import Window
import pyspark.sql.functions as fun

pin14 = temp861.select('ORIGIN','total_flights', fun.percent_rank().over(Window.partitionBy()\
    .orderBy(temp861['total_flights'])).alias("percentile"))\

print(pin14.count(),'rows')
pin14b2 = pin14.where(pin14['percentile'] <= 0.01)
pin14b2.select('ORIGIN').toPandas()

358 rows


Unnamed: 0,ORIGIN
0,YNG
1,ART
2,IFP
3,CYS


* We will use an SQL query which will provide us for each airport with the probability that a flight departing from that airport has a delay.


* We will exclude that airports we show before that correspond to the lowest 1% percentile.


* The probability will be computed by dividing the number of flights that we have declared as delayed from each airport with total number of flights from this airport. The result will be rounded to three decimal places for better presentation.


* Finally, we will sort our results in descending order based on the computed probability.

In [28]:
sql_way = spark.sql("""
    SELECT ORIGIN, ROUND(sum(Delayed_flights_coded)/count(ORIGIN),3) as prob
    FROM temp24_coded
    WHERE ORIGIN NOT IN ('YNG','ART','IFP','CYS')
    GROUP BY ORIGIN
    ORDER BY prob DESC
    """)

print(sql_way.count(),"rows")
sql_way.limit(6).toPandas()

354 rows


Unnamed: 0,ORIGIN,prob
0,SCK,0.382
1,HGR,0.348
2,OWB,0.346
3,OTH,0.345
4,OGD,0.328
5,MMH,0.326


* <ins>Some optimization</ins>: In principle we are recomputing all transformations every time.

* In order to re-use the previously computed predictions (transformations) we have performed, we can cache the results in memory using `caching`.

* Caching is an optimization technique for interactive and iterative Spark computations. It helps to save intermediate results so we can reuse them in subsequent stages.

In [29]:
sql_way.cache()

DataFrame[ORIGIN: string, prob: double]

* Now, we will use another SQL query which will provide us the average and median delay for each airport. 

* The [*percentile_approx*](https://spark.apache.org/docs/2.3.0/api/sql/index.html) function returns the approximate percentile value of numeric column col at the given percentage. 

* In order to compute the median we will use the 0.5 (50%) percentile in the 'DEP_DELAY' column (time the flight was delayed).

* Our results will not refer to the whole dataset, but only to the flights that we declared as delayed. We also sorted our results in descending order based on the average delay for each airport.

In [30]:
sql8= spark.sql("""
    SELECT ORIGIN, ROUND(AVG(DEP_DELAY),2) as avg_delay, percentile_approx(DEP_DELAY, 0.5) as median_delay
    FROM temp24_coded
    WHERE Delayed_flights_coded = 1
    GROUP BY ORIGIN
    ORDER BY avg_delay DESC
    """)

In [31]:
sql8.cache()

DataFrame[ORIGIN: string, avg_delay: double, median_delay: double]

In [32]:
sql8.limit(6).toPandas()

Unnamed: 0,ORIGIN,avg_delay,median_delay
0,DVL,201.11,83.0
1,PPG,181.85,35.0
2,JMS,150.76,50.0
3,RHI,141.66,79.0
4,APN,141.49,74.0
5,CMX,140.3,74.0


In [33]:
print(sql_way.toPandas().shape)
print(sql8.toPandas().shape)

(354, 2)
(358, 3)


* The airport "misery index" was shown in the first table we created which showed the probability that a flight departing from that airport has a delay. This table had 28 rows and 2 columns.

* We will use [inner join](https://www.w3schools.com/sql/sql_join_inner.asp) based on the "origin", between the previously created two tables in order to show for the airports that had flights that delayed:
    * the probability that a flight departing from an airport has a delay
    * average and median delay for each airport

* The reason we decided to use an inner join was because we wanted to enrich the "misery index" with the average and median delay. Remember, an inner join which selects records that have matching values in both tables. 

* The results will then be sorted in descending order based on the afforementioned probability.

In [34]:
# inner_join = sql_way.join(sql8, sql8.ORIGIN == sql_way.ORIGIN, how='inner')  # Origin column: emfanizetai 2 fores
inner_join = sql_way.join(sql8, on=['ORIGIN'], how='inner')   # => etsi swsto
inner_join.orderBy("prob", ascending=False).limit(6).toPandas()

Unnamed: 0,ORIGIN,prob,avg_delay,median_delay
0,SCK,0.382,61.58,37.0
1,HGR,0.348,78.23,36.0
2,OWB,0.346,85.68,35.0
3,OTH,0.345,94.08,60.0
4,OGD,0.328,57.05,30.0
5,MMH,0.326,112.95,50.0


* As we see the "misery index" has the same rows as before. We just tabulated the average and median delay.

In [35]:
print(inner_join.toPandas().shape)

(354, 4)


* In the data the airlines are only indicated by their code.

* In order to include airline names in our results, we downloaded the carrier lookup table from https://www.transtats.bts.gov/Download_Lookup.asp?Lookup=L_UNIQUE_CARRIERS.

In [36]:
flight_data.limit(3).toPandas()

Unnamed: 0,FL_DATE,TAIL_NUM,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEST,DEST_CITY_NAME,DEP_TIME,DEP_DELAY,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY
0,2018-01-01,N8891A,9E,GFK,"Grand Forks, ND",MSP,"Minneapolis, MN",1302,-8.0,1417,-12.0,0.0,,0.0,,,,,
1,2018-01-01,N8891A,9E,MSP,"Minneapolis, MN",GFK,"Grand Forks, ND",1124,9.0,1234,-11.0,0.0,,0.0,,,,,
2,2018-01-01,N293PQ,9E,DTW,"Detroit, MI",SAT,"San Antonio, TX",2022,4.0,2232,-50.0,0.0,,0.0,,,,,


In [37]:
airlines = spark.read.option("header", "true").csv("L_UNIQUE_CARRIERS.csv_")
airlines.limit(5).toPandas()

Unnamed: 0,Code,Description
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.


In [38]:
print(airlines.toPandas().shape)

(1672, 2)


* We see that we were able to download the names for 1672 different airlines.

* In order to add the airline names in our data, we will use an inner join based on the "carrier".

* Just for easier implementation, we will rename the "Code" variable in the carrier lookup table into "carrier" (same as in the other table).

In [39]:
# Rename column Code for easier join
airlines2b = airlines.withColumnRenamed("Code","CARRIER")
airlines2b.limit(5).toPandas()

Unnamed: 0,CARRIER,Description
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.


In [40]:
temp24_coded.limit(5).toPandas()

Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED,Delayed_flights,Delayed_flights_coded
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0,False,0
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0,True,1
2,9E,DTW,"Detroit, MI",2022,4.0,0.0,True,1
3,9E,BOS,"Boston, MA",1349,104.0,0.0,True,1
4,9E,MSP,"Minneapolis, MN",846,1.0,0.0,True,1


In [41]:
join21 = temp24_coded.join(airlines2b, on=['CARRIER'], how='inner') 
# out_join22 = temp24_coded.join(airlines, airlines.Code == temp24_coded.CARRIER, how='inner')   ## 2os tropos (xwris rename)
join21.limit(4).toPandas()

Unnamed: 0,CARRIER,ORIGIN,ORIGIN_CITY_NAME,DEP_TIME,DEP_DELAY,CANCELLED,Delayed_flights,Delayed_flights_coded,Description
0,9E,GFK,"Grand Forks, ND",1302,-8.0,0.0,False,0,Endeavor Air Inc.
1,9E,MSP,"Minneapolis, MN",1124,9.0,0.0,True,1,Endeavor Air Inc.
2,9E,DTW,"Detroit, MI",2022,4.0,0.0,True,1,Endeavor Air Inc.
3,9E,BOS,"Boston, MA",1349,104.0,0.0,True,1,Endeavor Air Inc.


In [42]:
total_flights2 = join21.count()
total_flights2

7096862

* For clearer understanding, we will only keep the columns that are necessary for our analysis. 

In [43]:
data38 = join21.select('Description','DEP_DELAY','Delayed_flights_coded')
data38.limit(4).toPandas()

Unnamed: 0,Description,DEP_DELAY,Delayed_flights_coded
0,Endeavor Air Inc.,-8.0,0
1,Endeavor Air Inc.,9.0,1
2,Endeavor Air Inc.,4.0,1
3,Endeavor Air Inc.,104.0,1


In [44]:
### Sql
data38.createOrReplaceTempView("data38")

# <center>`"Misery Index" for airlines`</center>

* We work similar to before and present the probability that a flight operated by each airline has a delay sorted in descending order.

In [45]:
pin21 = spark.sql("""
    SELECT Description, ROUND(sum(Delayed_flights_coded)/count(Description),2) as prob
    FROM data38
    GROUP BY Description
    ORDER BY prob DESC
    """)

pin21.limit(6).toPandas()

Unnamed: 0,Description,prob
0,Southwest Airlines Co.,0.5
1,Frontier Airlines Inc.,0.45
2,JetBlue Airways,0.42
3,PSA Airlines Inc.,0.38
4,American Airlines Inc.,0.35
5,Allegiant Air,0.34


In [46]:
pin21.cache()

DataFrame[Description: string, prob: double]

* Now, we will use another SQL query and similarly to what we did for the airports, we compute the average and median delay you may expect to have with an airline. 

In [47]:
pin22= spark.sql("""
    SELECT Description, ROUND(AVG(DEP_DELAY),2) as avg_delay, percentile_approx(DEP_DELAY, 0.5) as median_delay
    FROM data38
    WHERE Delayed_flights_coded = 1
    GROUP BY Description
    ORDER BY avg_delay DESC
    """)

pin22.limit(6).toPandas()

Unnamed: 0,Description,avg_delay,median_delay
0,ExpressJet Airlines LLC,61.81,31.0
1,SkyWest Airlines Inc.,54.62,24.0
2,Endeavor Air Inc.,53.84,25.0
3,Mesa Airlines Inc.,51.52,23.0
4,Frontier Airlines Inc.,50.82,25.0
5,Allegiant Air,49.59,22.0


In [48]:
pin22.cache()

DataFrame[Description: string, avg_delay: double, median_delay: double]

* Finally we use an [inner join](https://www.w3schools.com/sql/sql_join_inner.asp) based on the name of each airline, in order to enrich our airline misery index by tabulating
between the two previously created tables in order to show for the airports which had flights that delayed:
    * the probability that you will experience a delay flying with a particular airline 
    * the average and median delay that you are likely to experience

* The results will then be sorted in descending order based on the afforementioned probability.

In [49]:
# inner_join = sql_way.join(sql8, sql8.ORIGIN == sql_way.ORIGIN, how='inner')  # Origin column: emfanizetai 2 fores
pin_join = pin21.join(pin22, on=['Description'], how='inner')   # => etsi swsto
pin_join.orderBy("prob", ascending=False).limit(6).toPandas()

Unnamed: 0,Description,prob,avg_delay,median_delay
0,Southwest Airlines Co.,0.5,25.37,12.0
1,Frontier Airlines Inc.,0.45,50.82,25.0
2,JetBlue Airways,0.42,46.95,24.0
3,PSA Airlines Inc.,0.38,39.98,17.0
4,American Airlines Inc.,0.35,37.03,16.0
5,Allegiant Air,0.34,49.59,22.0


* After completing our analysis, we stop the Spark Session.

In [50]:
# In the end, stop the session
spark.stop()