In [1]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=15
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="8:00" #1 hour
os.environ['SBATCH_PARTITION']='lattice' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 666761

INFO:sparkhpc.sparkjob:Submitted cluster 0


# Load data

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('dfTest').getOrCreate()
df = spark.read.csv('Data/AirOnTimeCSV/airOT20*',inferSchema=True,header=True)

In [3]:
# remove empty last column and print schema
df = df.select(df.columns[:44])
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- FL_DATE: timestamp (nullable = true)
 |-- UNIQUE_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- DEP_DELAY_NEW: double (nullable = true)
 |-- DEP_DEL15: double (nullable = true)
 |-- DEP_DELAY_GROUP: integer (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: 

In [4]:
# limit to just flights out of Houston (IAH)
houstondf = df.where(df.ORIGIN == "IAH")
houstondf.show(5)

+----+-----+------------+-----------+-------------------+--------------+--------+------+-----------------+------+----------------+---------------+----+--------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+---------+-----------------+--------+----------------+-------------------+--------+-------+--------+--------------+-------------+-------------+---------+--------------+-------------------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|            FL_DATE|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|DEST_AIRPORT_ID|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|FLIGHTS|DISTANCE|DISTANCE_GROUP|

In [5]:
print(df.count())
print(houstondf.count())

84370143
2306830


# Clean Data

In [None]:
# remove the one null value entry where CRS_DEP_TIME is Null
# Does nothing on this dataset... it happens in 2012 from AUS, TX, not IAH, TX
#cleandf = houstondf.where(~F.isnull(df.CRS_DEP_TIME))

In [6]:
#For now...
cleandf = houstondf

In [None]:
#print(cleandf.count())

# Get Labels and Features

In [7]:
# create label column
# 3-CANCELLED
# 2-DIVERTED
# 1-ARR_DEL15
# 0-ON_TIME (to 15 mins late)
# -1-OTHER (shouldn't be any others)

lfdf = cleandf.withColumn("LABEL", when(cleandf.CANCELLED > 0.5, 3)\
                   .otherwise(when(cleandf.DIVERTED > 0.5, 2)\
                             .otherwise(when(cleandf.ARR_DEL15 > 0.5, 1)\
                                       .otherwise(when(cleandf.ARR_DEL15 < 0.5, 0)\
                                                  .otherwise(-1)))))

In [8]:
# Select features and label
# removed ORIGIN and ORIGIN_STATE_ABR because they are all the same now that it is just Houston (IAH)
lfdf = lfdf.select("YEAR", "MONTH", "DAY_OF_WEEK", "UNIQUE_CARRIER", "ORIGIN",\
               "DEST", "DEST_STATE_ABR", "CRS_DEP_TIME", "DISTANCE", "LABEL")
lfdf.show(5)

+----+-----+-----------+--------------+------+----+--------------+------------+--------+-----+
|YEAR|MONTH|DAY_OF_WEEK|UNIQUE_CARRIER|ORIGIN|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DISTANCE|LABEL|
+----+-----+-----------+--------------+------+----+--------------+------------+--------+-----+
|2006|    8|          2|            AA|   IAH| ORD|            IL|        1546|   925.0|    1|
|2006|    8|          3|            AA|   IAH| ORD|            IL|        1546|   925.0|    1|
|2006|    8|          4|            AA|   IAH| ORD|            IL|        1546|   925.0|    1|
|2006|    8|          5|            AA|   IAH| ORD|            IL|        1546|   925.0|    0|
|2006|    8|          6|            AA|   IAH| ORD|            IL|        1554|   925.0|    1|
+----+-----+-----------+--------------+------+----+--------------+------------+--------+-----+
only showing top 5 rows



In [9]:
# check number of data points
print(lfdf.count())

2306830


In [14]:
# take only the last 3 years of data so it can be processed on just one machine
lfdf = lfdf.where(lfdf.YEAR > 2009)
lfdf.show(10)

+----+-----+-----------+--------------+------+----+--------------+------------+--------+-----+
|YEAR|MONTH|DAY_OF_WEEK|UNIQUE_CARRIER|ORIGIN|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DISTANCE|LABEL|
+----+-----+-----------+--------------+------+----+--------------+------------+--------+-----+
|2010|    7|          4|            9E|   IAH| MEM|            TN|        1155|   469.0|    0|
|2010|    7|          5|            9E|   IAH| MEM|            TN|        1155|   469.0|    1|
|2010|    7|          6|            9E|   IAH| MEM|            TN|        1155|   469.0|    0|
|2010|    7|          7|            9E|   IAH| MEM|            TN|        1155|   469.0|    0|
|2010|    7|          1|            9E|   IAH| MEM|            TN|        1155|   469.0|    3|
|2010|    7|          2|            9E|   IAH| MEM|            TN|        1155|   469.0|    0|
|2010|    7|          3|            9E|   IAH| MEM|            TN|        1155|   469.0|    1|
|2010|    7|          4|            9E|   IAH| MEM

In [None]:
# check number of data points
print(lfdf.count())

In [13]:
# save features and label in files
years = [2010]
#years = [2012-x for x in range (25)] # list of years in reverse.
months = [x for x in range(6, 13)]

for year in years:
    for month in months:
        filename = "FeaturesLabels" + str(year) + "{:02d}".format(month) + ".csv"
        print(filename)
        lfdf.where((lfdf.YEAR == year) & (lfdf.MONTH == month)).toPandas().to_csv("./Data/preprocR1-1/arrDel_cancelled_diverted_onTime_Houston/"+filename, header=True)

FeaturesLabels201006.csv
FeaturesLabels201007.csv
FeaturesLabels201008.csv
FeaturesLabels201009.csv
FeaturesLabels201010.csv
FeaturesLabels201011.csv
FeaturesLabels201012.csv


# Data Cleaning Checks

In [None]:
# check for NULL values and Nan values in original data
#df.select([F.count(when(F.isnan(c), c)).alias(c) for c in df.columns]).show()
df.select([F.count(when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
# check for NULL values and Nan values in feature and label data
#lfdf.select([F.count(when(F.isnan(c), c)).alias(c) for c in lfdf.columns]).show() # no nans in this dataset
lfdf.select([F.count(when(F.isnull(c), c)).alias(c) for c in lfdf.columns]).show()

## investigate ORIGIN_STATE_ABR
since we are reducing the data to just those flights leaving from Houston (IAH), we will be removing this column. Don't need to investigate

## investigate CRS_ELAPSED_TIME null values
This one is tricky...

It is not just in early records or just for diverted flights...

Maybe calculate it from CRS_DEP_TIME and CRS_ARR_TIME? or remove column?

Decided to remove the column

In [None]:
test3 = df.where(F.isnull(df.CRS_ELAPSED_TIME))
test3.show(10)

In [None]:
test4 = test3.select("YEAR").distinct()
test4.show()
test5 = test3.select("DIVERTED").distinct()
test5.show()

## investigate DEST_STATE_ABR null values
They only exist in the very early data (YEAR = 1987-1990). Do not need to worry about it

In [None]:
test1 = df.where(F.isnull(df.DEST_STATE_ABR))
test2 = test1.select("YEAR").distinct()
test2.show()

## investigate CRS_DEP_TIME null value
There is only one. so we can just remove it

In [None]:
df.where(F.isnull(df.CRS_DEP_TIME)).show(10)