<h1> Experimenting with different models </h1>

In this notebook, we try out different ideas.  The first thing we have to do is to create a validation set, so that we are not doing experimentation with our independent test dataset.

In [1]:
BUCKET='cloud-training-demos-ml'

os.environ['BUCKET'] = BUCKET

In [2]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

<h2> Read dataset </h2>

In [3]:
traindays = spark.read \
    .option("header", "true") \
    .csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')

In [4]:
from pyspark.sql.types import StringType, FloatType, StructType, StructField

header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'

def get_structfield(colname):
   if colname in ['ARR_DELAY', 'DEP_DELAY', 'DISTANCE', 'TAXI_OUT']:
      return StructField(colname, FloatType(), True)
   else:
      return StructField(colname, StringType(), True)

schema = StructType([get_structfield(colname) for colname in header.split(',')])

In [5]:
inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*' # 1/30th
#inputs = 'gs://{}/flights/tzcorr/all_flights-*'  # FULL
flights = spark.read\
            .schema(schema)\
            .csv(inputs.format(BUCKET))

# this view can now be queried ...
flights.createOrReplaceTempView('flights')

<h2> Create separate training and validation data </h2>

In [8]:
from pyspark.sql.functions import rand
SEED = 13
traindays = traindays.withColumn("holdout", rand(SEED) > 0.8)  # 80% of data is for training
traindays.createOrReplaceTempView('traindays')

In [9]:
traindays.head(10)

[Row(FL_DATE=u'2015-01-01', is_train_day=u'True', holdout=False),
 Row(FL_DATE=u'2015-01-02', is_train_day=u'False', holdout=True),
 Row(FL_DATE=u'2015-01-03', is_train_day=u'False', holdout=False),
 Row(FL_DATE=u'2015-01-04', is_train_day=u'True', holdout=False),
 Row(FL_DATE=u'2015-01-05', is_train_day=u'True', holdout=True),
 Row(FL_DATE=u'2015-01-06', is_train_day=u'False', holdout=False),
 Row(FL_DATE=u'2015-01-07', is_train_day=u'True', holdout=False),
 Row(FL_DATE=u'2015-01-08', is_train_day=u'True', holdout=False),
 Row(FL_DATE=u'2015-01-09', is_train_day=u'True', holdout=False),
 Row(FL_DATE=u'2015-01-10', is_train_day=u'True', holdout=False)]

<h2> Logistic regression </h2>

In [10]:
trainquery = """
SELECT
  *
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True' AND
  t.holdout == False AND
  f.CANCELLED == '0.00' AND 
  f.DIVERTED == '0.00'
"""
traindata = spark.sql(trainquery)

In [11]:
traindata.head()

Row(FL_DATE=u'2015-02-03', UNIQUE_CARRIER=u'AA', AIRLINE_ID=u'19805', CARRIER=u'AA', FL_NUM=u'1', ORIGIN_AIRPORT_ID=u'12478', ORIGIN_AIRPORT_SEQ_ID=u'1247802', ORIGIN_CITY_MARKET_ID=u'31703', ORIGIN=u'JFK', DEST_AIRPORT_ID=u'12892', DEST_AIRPORT_SEQ_ID=u'1289203', DEST_CITY_MARKET_ID=u'32575', DEST=u'LAX', CRS_DEP_TIME=u'2015-02-03T14:00:00', DEP_TIME=u'2015-02-03T13:56:00', DEP_DELAY=-4.0, TAXI_OUT=24.0, WHEELS_OFF=u'2015-02-03T14:20:00', WHEELS_ON=u'2015-02-03T20:20:00', TAXI_IN=u'7.00', CRS_ARR_TIME=u'2015-02-03T20:35:00', ARR_TIME=u'2015-02-03T20:27:00', ARR_DELAY=-8.0, CANCELLED=u'0.00', CANCELLATION_CODE=None, DIVERTED=u'0.00', DISTANCE=2475.0, DEP_AIRPORT_LAT=u'40.63972222', DEP_AIRPORT_LON=u'-73.77888889', DEP_AIRPORT_TZOFFSET=u'-18000.0', ARR_AIRPORT_LAT=u'33.94250000', ARR_AIRPORT_LON=u'-118.40805556', ARR_AIRPORT_TZOFFSET=u'-28800.0', EVENT=None, NOTIFY_TIME=None, FL_DATE=u'2015-02-03', is_train_day=u'True', holdout=False)

In [12]:
def to_example(fields):
  return LabeledPoint(\
              float(fields['ARR_DELAY'] < 15), #ontime \
              [ \
                  fields['DEP_DELAY'], # DEP_DELAY \
                  fields['TAXI_OUT'], # TAXI_OUT \
                  fields['DISTANCE'], # DISTANCE \
              ])

In [13]:
examples = traindata.rdd.map(to_example)

In [14]:
lrmodel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
print lrmodel.weights,lrmodel.intercept

[-0.164881634485,-0.133161902516,0.000326021867615] 5.16532271486


In [15]:
lrmodel.setThreshold(0.7) # cancel if prob-of-ontime < 0.7

<h2> Evaluate model on the heldout data </h2>


In [16]:
evalquery = trainquery.replace("t.holdout == False","t.holdout == True")
print evalquery


SELECT
  *
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True' AND
  t.holdout == True AND
  f.CANCELLED == '0.00' AND 
  f.DIVERTED == '0.00'



In [17]:
evaldata = spark.sql(evalquery)
examples = evaldata.rdd.map(to_example)

In [18]:
def eval(labelpred):
    cancel = labelpred.filter(lambda (label, pred): pred == 0)
    nocancel = labelpred.filter(lambda (label, pred): pred == 1)
    corr_cancel = cancel.filter(lambda (label, pred): label == pred).count()
    corr_nocancel = nocancel.filter(lambda (label, pred): label == pred).count()
    
    cancel_denom = cancel.count()
    nocancel_denom = nocancel.count()
    if cancel_denom == 0:
        cancel_denom = 1
    if nocancel_denom == 0:
        nocancel_denom = 1

    return {'total_cancel': cancel.count(), \
            'correct_cancel': float(corr_cancel)/cancel_denom, \
            'total_noncancel': nocancel.count(), \
            'correct_noncancel': float(corr_nocancel)/nocancel_denom \
           }

In [19]:
labelpred = examples.map(lambda p: (p.label, lrmodel.predict(p.features)))
print eval(labelpred)

{'correct_cancel': 0.8430138990490125, 'total_noncancel': 19276, 'correct_noncancel': 0.9511309400290516, 'total_cancel': 6835}
