In [None]:
import os
import pyspark
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
KEYSPACE = "sparkassandra"
APP_NAME = KEYSPACE
CASSANDRA_IP = "cassandra_node"

## Init PySpark

In [None]:
SUBMIT_ARGS = "--packages com.databricks:spark-csv_2.11:1.2.0,com.datastax.spark:spark-cassandra-connector_2.10:1.6.0-M1 pyspark-shell"

os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [None]:
# Make sure the driver and workers all use Python2
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'

In [None]:
#Init Spark Conf
conf = pyspark.SparkConf().setAppName(APP_NAME)
conf.set("spark.cassandra.connection.host",CASSANDRA_IP)

In [None]:
# Init Spark Context
sc = pyspark.SparkContext('spark://10.0.1.2:7077', conf = conf)

In [None]:
# Init Spark SQL Context
sql_ctx = SQLContext(sc)

## Load Flight Data from Cassandra (us_flights table)

In [None]:
flights_df = sql_ctx.read.format("org.apache.spark.sql.cassandra").options(table="us_flights", keyspace=KEYSPACE).load()

In [None]:
flights_df.printSchema()

In [None]:
def convertColumn(df, name, newType) :
    df_1 = df.withColumnRenamed(name, "swap")
    df_1 = df_1.withColumn(name, df_1["swap"].cast(newType)).drop("swap")
    return df_1

In [None]:
flights_df = convertColumn(flights_df, "dayofweek", "int")
flights_df = convertColumn(flights_df, "arrdelay", "int")
flights_df = convertColumn(flights_df, "depdelay", "int")

In [None]:
flights_df.registerTempTable("flight")

## When is the best day of week to fly to minimise delays?

In [None]:
# SQL Request method
#sql_ctx.sql("""SELECT DayOfWeek as weekday, avg(ArrDelay) as avg_delay 
#                                FROM flight
#                                GROUP BY DayOfWeek
#                                ORDER BY avg_delay DESC""").show()

In [None]:
# DataFrame syntax
df_minimise_delays_weekday = (flights_df
                              .select('dayofweek', 'arrdelay')
                              .groupBy('dayofweek')
                              .agg(F.avg('arrdelay').alias('AVG_DELAY_ARR')))

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def convert_day_of_week(val):
    week   = ['Monday', 
              'Tuesday', 
              'Wednesday', 
              'Thursday',  
              'Friday', 
              'Saturday',
              'Sunday']
    
    return week[int(val)-1]


In [None]:
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udf_convert_day_of_week = udf(convert_day_of_week, StringType())

In [None]:
df_minimise_delays_weekday = df_minimise_delays_weekday.withColumn("DAY_OF_WEEK", udf_convert_day_of_week(df_minimise_delays_weekday['dayofweek']))

In [None]:
df_minimise_delays_weekday[["DAY_OF_WEEK", "AVG_DELAY_ARR"]].show()

In [None]:
sns.set_style('whitegrid')
df_minimise_delays_weekday_pd = df_minimise_delays_weekday.toPandas()

plt.figure(figsize=(30,8))
ax = sns.barplot(x='DAY_OF_WEEK', y='AVG_DELAY_ARR', data=df_minimise_delays_weekday_pd)
for p in ax.patches:
    height = p.get_height()
    ax.text(p.get_x(), height+ 0.25, '%1.00f'%p.get_height())
plt.title('When is the best time of day of time of year to fly to minimise delays?')
plt.xlabel('Day')
plt.ylabel('Average Delay')
plt.legend

## When is the best day of week of month to fly to minimise delays?

In [None]:
df_minimise_delays_weekday_month = (flights_df
                               .select('month', 'dayofweek', 'arrdelay')
                               .groupBy('dayofweek').pivot('month').avg('arrdelay'))
# Create Column DAY_OF_WEEK : convert int => string value
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumn("DAY_OF_WEEK", udf_convert_day_of_week(df_minimise_delays_weekday.dayofweek))
# Drop DayOfWeek (int value)
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.drop("dayofweek")
# Update column name : convert int month to str month
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("1", "January")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("2", "February")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("3", "March")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("4", "April")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("5", "May")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("6", "June")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("7", "July")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("8", "August")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("9", "September")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("10", "October")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("11", "November")
df_minimise_delays_weekday_month = df_minimise_delays_weekday_month.withColumnRenamed("12", "December")
df_minimise_delays_weekday_month.show()

In [None]:
# Convert to Pandas
pd_minimise_delays_weekday_month = df_minimise_delays_weekday_month.toPandas()

# Add Month column name to index columns
pd_minimise_delays_weekday_month.columns.names = ['Month']

# Update index values by DAY_OF_WEEK values & drop DAY_OF_WEEK columns
pd_minimise_delays_weekday_month = pd_minimise_delays_weekday_month.rename(index=pd_minimise_delays_weekday_month.DAY_OF_WEEK).drop('DAY_OF_WEEK', axis=1)
pd_minimise_delays_weekday_month

In [None]:
# Draw a heatmap 
grid_kws = {"height_ratios": (.9, .05), "hspace": 0.9}
f, (ax, cbar_ax) = plt.subplots(2, figsize=(10, 5),gridspec_kw=grid_kws)
sns.heatmap(pd_minimise_delays_weekday_month, ax=ax, linewidths=0.01, cmap="YlGnBu", square=True, cbar_ax=cbar_ax, cbar_kws={"orientation": "horizontal"})

## Load Airports Data

In [None]:
airports_df = sql_ctx.read.format("org.apache.spark.sql.cassandra").options(table="airports", keyspace=KEYSPACE).load()

In [None]:
airports_df.printSchema()

## When is the best airport to fly to minimise delays?

In [None]:
df_minimise_delays_airport_dest = (flights_df
                              .select('dest', 'arrdelay')
                              .groupBy('dest')
                              .agg(F.avg('arrdelay').alias('AVG_DELAY_ARR'))
                              .sort(F.desc('AVG_DELAY_ARR')))

In [None]:
# merge with airport data

df_minimise_delays_airport_dest = (df_minimise_delays_airport_dest
                 .join(airports_df, df_minimise_delays_airport_dest.dest == airports_df.iata, 'inner'))
df_minimise_delays_airport_dest.show(4)

In [None]:
pd_minimise_delays_airport_dest = df_minimise_delays_airport_dest.toPandas()

In [None]:
pd_minimise_delays_airport_dest.to_csv("pd_minimise_delays_airport_dest.csv", sep=";")