In [1]:
# Define dataset azure path
flightPerfFilePath="wasb://sparklabdata@sparkclusterlab.blob.core.windows.net/Flight/*/*.csv"

# Obtain dataframe
flightPerf=spark.read.format("com.databricks.spark.csv").options(header='true').load(flightPerfFilePath)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1522764435524_0004,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [13]:
flightPerf.registerTempTable("flightPerfTable")

In [25]:
flightPerf.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- QUARTER: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY_OF_MONTH: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- UNIQUE_CARRIER: string (nullable = true)
 |-- AIRLINE_ID: string (nullable = true)
 |-- CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: string (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: string (nullable = true)
 |-- ORIGIN_CITY_MARKET_ID: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- ORIGIN_STATE_FIPS: string (nullable = true)
 |-- ORIGIN_STATE_NM: string (nullable = true)
 |-- ORIGIN_WAC: string (nullable = true)
 |-- DEST_AIRPORT_ID: string (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: string (nullable = true)
 |-- DEST_CITY_MARKET_ID: s

In [24]:
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [48]:
top5_df = flightPerf.filter(flightPerf.YEAR == 2016) \
    .groupBy("DEST_STATE_NM") \
    .count() \
    .sort("count", ascending=False) \
    .limit(5)
count_df = flightPerf \
            .join(top5_df, 'DEST_STATE_NM', 'inner') \
            .groupBy('YEAR', 'DEST_STATE_NM') \
            .count()
lag_df = count_df.withColumn('prev_year_count', func.lag(count_df['count']) \
                             .over(Window.partitionBy("DEST_STATE_NM").orderBy("YEAR")))
yoy_df = lag_df.withColumn('prev_year_var', (lag_df['count']/lag_df['prev_year_count']-1)*100) \
            .filter('YEAR >= 2015 and YEAR <= 2016') \
            .sort('prev_year_var', ascending=False) \
            .select('YEAR', 'DEST_STATE_NM', 'count', 'prev_year_var')
yoy_df.show()

+----+-------------+------+-------------------+
|YEAR|DEST_STATE_NM| count|      prev_year_var|
+----+-------------+------+-------------------+
|2015|     Illinois|418264|  6.745475751149077|
|2015|      Florida|449248|  4.950754106939281|
|2016|   California|727407| 2.7756505718023794|
|2015|      Georgia|394412|  2.165511381886387|
|2016|      Georgia|398518| 1.0410433759622029|
|2016|      Florida|447168|-0.4629959398817607|
|2015|        Texas|688031| -4.142848584568526|
|2015|   California|707762|-4.2490901957601075|
|2016|        Texas|578440|-15.928206723243576|
|2016|     Illinois|340426| -18.60977755675841|
+----+-------------+------+-------------------+

In [49]:
# Show top 5 origin cities having the most flight cancellation
flightPerf.filter(flightPerf.CANCELLED == 1)\
.groupBy("ORIGIN_CITY_NAME")\
.count()\
.orderBy("count",ascending=False)\
.show(5)

+--------------------+-----+
|    ORIGIN_CITY_NAME|count|
+--------------------+-----+
|         Chicago, IL|35390|
|        New York, NY|20779|
|Dallas/Fort Worth...|17050|
|         Atlanta, GA|13440|
|          Newark, NJ|12912|
+--------------------+-----+
only showing top 5 rows

In [57]:
# Find out the main reasons of the cancellations
flightCancel = flightPerf.filter(flightPerf.CANCELLED == 1) \
    .groupBy("CANCELLATION_CODE") \
    .count()
    
# Define dataset azure path
annulationsFilePath="wasb://sparklabdata@sparkclusterlab.blob.core.windows.net/References/RefAnnulations.csv"

# Obtain dataframe
annulations=spark.read.format("com.databricks.spark.csv").options(header='true').load(annulationsFilePath)

join_df = flightCancel \
            .join(annulations, flightCancel.CANCELLATION_CODE == annulations.Code)

join_df.show()

+-----------------+------+----+-------------------+
|CANCELLATION_CODE| count|Code|        Description|
+-----------------+------+----+-------------------+
|                B|168794|   B|            Weather|
|                D|   348|   D|           Security|
|                C| 64893|   C|National Air System|
|                A| 94992|   A|            Carrier|
+-----------------+------+----+-------------------+

In [58]:
DF = flightPerf.select(flightPerf.AIR_TIME.cast('float'),flightPerf.DISTANCE.cast('float'))
print(DF.stat.corr("AIR_TIME","DISTANCE"))

0.9617094450170394

In [68]:
%%sql
DROP TABLE IF EXISTS flightMonthTable

In [69]:
%%sql
CREATE TEMPORARY VIEW flightMonthTable
AS
SELECT
    YEAR,
    MONTH,
    DEST_STATE_NM,
    COUNT(*) AS count
FROM flightPerfTable
GROUP BY
    YEAR,
    MONTH,
    DEST_STATE_NM

In [70]:
%%sql
SELECT
    CAST(MONTH as INTEGER) AS Month,
    count
FROM flightMonthTable
WHERE YEAR = 2016 AND DEST_STATE_NM = 'Wyoming'
ORDER BY 1

AttributeError: 'module' object has no attribute 'api'

Unnamed: 0,Month,count
0,1,772
1,2,696
2,3,774
3,4,526
4,5,635
5,6,825
6,7,1025
7,8,964
8,9,763
9,10,592


In [71]:
%%sql
SHOW TABLES

AttributeError: 'module' object has no attribute 'api'

Unnamed: 0,database,tableName,isTemporary
0,default,hivesampletable,False
1,,flightmonthtable,True
2,,flightperftable,True
