In [None]:
PROJECT=!gcloud config get-value project
PROJECT=PROJECT[0]
BUCKET = PROJECT + '-dsongcp'
import os
os.environ['BUCKET'] = PROJECT + '-dsongcp'

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext('local', 'logistic')
spark = SparkSession \
    .builder \
    .appName("Logistic regression w/ Spark ML") \
    .getOrCreate()

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

In [None]:
traindays = spark.read \
    .option("header", "true") \
    .csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')

In [None]:
traindays.createOrReplaceTempView('traindays')

In [None]:
spark.sql("SELECT * from traindays LIMIT 5").show()

In [None]:
inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET)

In [None]:
flights = spark.read.json(inputs)
flights.createOrReplaceTempView('flights')

In [None]:
trainquery = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True'
"""
traindata = spark.sql(trainquery)

In [None]:
print(traindata.head(2))

In [None]:
traindata.describe().show()

In [None]:
trainquery = """
SELECT
DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
t.is_train_day == 'True' AND
f.dep_delay IS NOT NULL AND 
f.arr_delay IS NOT NULL
"""
traindata = spark.sql(trainquery)
traindata.describe().show()

In [None]:
trainquery = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True' AND
  f.CANCELLED == 'False' AND 
  f.DIVERTED == 'False'
"""
traindata = spark.sql(trainquery)
traindata.describe().show()

In [None]:
def to_example(fields):
    return LabeledPoint(\
              float(fields['ARR_DELAY'] < 15), #ontime? \
              [ \
                  fields['DEP_DELAY'], \
                  fields['TAXI_OUT'],  \
                  fields['DISTANCE'],  \
              ])

In [None]:
examples = traindata.rdd.map(to_example)

In [None]:
lrmodel = LogisticRegressionWithLBFGS.train(examples, intercept=True)

In [None]:
print(lrmodel.weights,lrmodel.intercept)

In [None]:
print(lrmodel.predict([6.0,12.0,594.0]))

In [None]:
print(lrmodel.predict([36.0,12.0,594.0]))

In [None]:
lrmodel.clearThreshold()
print(lrmodel.predict([6.0,12.0,594.0]))
print(lrmodel.predict([36.0,12.0,594.0]))

In [None]:
lrmodel.setThreshold(0.7) 
print(lrmodel.predict([6.0,12.0,594.0]))
print(lrmodel.predict([36.0,12.0,594.0]))

In [None]:
MODEL_FILE='gs://' + BUCKET + '/flights/sparkmloutput/model'
os.system('gsutil -m rm -r ' + MODEL_FILE)

In [None]:
lrmodel.save(sc, MODEL_FILE)
print('{} saved'.format(MODEL_FILE))

In [None]:
lrmodel = 0
print(lrmodel)

In [None]:
from pyspark.mllib.classification import LogisticRegressionModel
lrmodel = LogisticRegressionModel.load(sc, MODEL_FILE)
lrmodel.setThreshold(0.7)