Machine Learning using SparkML

In [2]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

In [15]:
BUCKET = 'my_first_project_0242'
os.environ['BUCKET'] = BUCKET


We will be considering
The flights whose arrival delay < 15 mins as 1
and flights whose arrival delay >=15 mins as 0
So our logistic regresison model will predict the likelihood of y=1

Loading the data from storage bucket

In [4]:
traindays = spark.read.option("header","true").csv('gs://my_first_project_0242/flights/trainday.csv')

In [6]:
traindays.createOrReplaceTempView('traindays')

Printing first 5 lines of code

In [7]:
spark.sql("SELECT * from traindays LIMIT 5").show()

+----------+------------+
|   FL_DATE|is_train_day|
+----------+------------+
|2015-01-01|        True|
|2015-01-02|       False|
|2015-01-03|       False|
|2015-01-04|        True|
|2015-01-05|        True|
+----------+------------+



In [8]:
from pyspark.sql.types import StringType,FloatType,StructType,StructField

Marking the headers, Creating a framework over the CSV file 

In [12]:
header = '''FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,
ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,
DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,
DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,
CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,
CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,
DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,
ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'''


In [13]:
header.format

<function str.format>

In [14]:
def get_structfield(colname):
  if colname in ['ARR_DELAY','DEP_DELAY','DISTANCE','TAXI_OUT']:
    return StructField(colname,FloatType(),True)
  else:
    return StructField(colname,StringType(),True)
  
schema = StructType([get_structfield(colname) for colname in header.split(',')])  

We have less amount to resouces. Our hadoop cluster is small. So for demonstation, we will be reading only a small portion of file


In [16]:
  inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET)

Creating a flight view from the schema

In [17]:
flights = spark.read.schema(schema).csv(inputs)
flights.createOrReplaceTempView('flights')

We will train the flights whose is_train_day is True

In [18]:
trainquery = """
SELECT f.* FROM flights f 
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE t.is_train_day == 'True'
"""
traindata = spark.sql(trainquery)

In [19]:
traindata.head(2)

[Row(FL_DATE='2015-08-01', UNIQUE_CARRIER='OO', AIRLINE_ID='20304', CARRIER='OO', FL_NUM='2928', ORIGIN_AIRPORT_ID='13930', 
 ORIGIN_AIRPORT_SEQ_ID='1393003', ORIGIN_CITY_MARKET_ID='30977', ORIGIN='ORD', DEST_AIRPORT_ID='10990', 
 DEST_AIRPORT_SEQ_ID='1099002', DEST_CITY_MARKET_ID='30990', DEST='CHO', CRS_DEP_TIME='2015-08-01T13:20:00', DEP_TIME='2015-08-01T14:11:00', 
 DEP_DELAY='51.00', TAXI_OUT=26.0, WHEELS_OFF='2015-08-01T14:37:00', WHEELS_ON='2015-08-01T15:56:00', TAXI_IN='5.00', 
 CRS_ARR_TIME='2015-08-01T15:15:00', ARR_TIME='2015-08-01T16:01:00', ARR_DELAY=46.0, CANCELLED='0.00', 
 CANCELLATION_CODE=None, DIVERTED='0.00', DISTANCE=567.0, DEP_AIRPORT_LAT='41.97944444', 
 DEP_AIRPORT_LON='-87.90750000', DEP_AIRPORT_TZOFFSET='-18000.0', ARR_AIRPORT_LAT='38.13861111', ARR_AIRPORT_LON='-78.45277778', 
 ARR_AIRPORT_TZOFFSET='-14400.0', EVENT=None, NOTIFY_TIME=None),
 Row(FL_DATE='2015-08-01', UNIQUE_CARRIER='OO', AIRLINE_ID='20304', CARRIER='OO', FL_NUM='2930', ORIGIN_AIRPORT_ID='1393

Looking at the basic statistics of the data set

In [20]:
traindata.describe().show()

+-------+----------+--------------+------------------+-------+------------------+------------------+----------------------+---------------------+------+------------------+--------------------+-------------------+------+-------------------+-------------------+-----------------+-----------------+-------------------+-------------------+------------------+-------------------+-------------------+-----------------+--------------------+------------------+--------------------+-----------------+------------------+------------------+--------------------+------------------+-----------------+---------------------+-----+-----------+
|summary|   FL_DATE|UNIQUE_CARRIER|        AIRLINE_ID|CARRIER|            FL_NUM| ORIGIN_AIRPORT_ID|
ORIGIN_AIRPORT_SEQ_ID|ORIGIN_CITY_MARKET_ID|ORIGIN|   DEST_AIRPORT_ID|
DEST_AIRPORT_SEQ_ID|DEST_CITY_MARKET_ID|  DEST|       CRS_DEP_TIME|           DEP_TIME|       
DEP_DELAY|         TAXI_OUT|         WHEELS_OFF|          WHEELS_ON|           TAXI_IN|      
CRS_ARR_TIM

There is mismatch in the statistics of count of distance, dep_delay, taxi_out time
There are some flights that are scheduled but they never take off
There are some flights that take off but never arrive at the prescibed destination airport
Spark describe method never counts NULL values


Removing the null values of ARR_DELAY and DEP_DELAY

In [27]:
trainquery = """
SELECT f.* FROM flights f 
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE t.is_train_day == 'True' and 'f.DEP_DELAY' IS NOT NULL

"""
traindata = spark.sql(trainquery)
traindata

DataFrame[FL_DATE: string, UNIQUE_CARRIER: string, AIRLINE_ID: string, CARRIER: string, FL_NUM: string, ORIGIN_AIRPORT_ID: string, 
ORIGIN_AIRPORT_SEQ_ID: string, ORIGIN_CITY_MARKET_ID: string, ORIGIN: string, DEST_AIRPORT_ID: string, 
DEST_AIRPORT_SEQ_ID: string, DEST_CITY_MARKET_ID: string, DEST: string, CRS_DEP_TIME: string, DEP_TIME: string, 
DEP_DELAY: string, TAXI_OUT: float, WHEELS_OFF: string, WHEELS_ON: string, TAXI_IN: string, 
CRS_ARR_TIME: string, ARR_TIME: string, ARR_DELAY: float, CANCELLED: string, 
CANCELLATION_CODE: string, DIVERTED: string, DISTANCE: float, DEP_AIRPORT_LAT: string, 
DEP_AIRPORT_LON: string, DEP_AIRPORT_TZOFFSET: string, ARR_AIRPORT_LAT: string, ARR_AIRPORT_LON: string, 
ARR_AIRPORT_TZOFFSET: string, EVENT: string, NOTIFY_TIME: string]