<h2> 6. Bayes Classification </h2>

This notebook has the code for the charts in Chapter 6


### Setup

In [None]:
PROJECT=!gcloud config get-value project
PROJECT=PROJECT[0]

In [None]:
import os
BUCKET = 'dsongcp_data_lake_de-zoomcamp-prj-375800' # REPLACE WITH YOUR BUCKET NAME if this won't work
REGION = 'europe-west6' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

os.environ['BUCKET'] = BUCKET

<h3> Exploration using BigQuery </h3>

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import google.cloud.bigquery as bigquery

bq = bigquery.Client()

<h3> Set up views in Spark SQL </h3>

Start a Spark Session if necessary and get a handle to it.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Bayes classification using Spark") \
    .getOrCreate()
print(spark)

Create a table definition (this is done lazily; the files won't be read until we issue a query):

In [None]:
inputs = 'gs://{}/flights/raw/20150*'.format(BUCKET) # SAMPLE
#inputs = 'gs://{}/flights/raw/201*'.format(BUCKET)  # FULL
flights = spark.read.json(inputs)

# this view can now be queried ...
flights.createOrReplaceTempView('flights')

In [None]:
# Note that distance is STRING
print(flights.schema)

Example query over the view:

In [None]:
results = spark.sql('SELECT COUNT(*) FROM flights WHERE dep_delay > -20 AND CAST(distance AS FLOAT) < 2000')
results.show()

<h2> Restrict to train days </h2>

Let's create a CSV file of the training days

Create dataframe of traindays.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
schema = StructType([
    StructField('FL_DATE', StringType(), True),
    StructField('is_train_day', BooleanType(), True)
])
traindays = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('gs://{}/flights/trainday.csv'.format(BUCKET))

traindays.createOrReplaceTempView('traindays')

In [None]:
results = spark.sql('SELECT * FROM traindays')
results.head(5)

In [None]:
statement = """
SELECT
  f.FL_DATE AS date,
  CAST(distance AS FLOAT) AS distance,
  dep_delay,
  IF(arr_delay < 15, 1, 0) AS ontime
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day AND
  f.dep_delay IS NOT NULL
ORDER BY
  f.dep_delay DESC
"""
flights = spark.sql(statement)

Create dataframe of testdaystemp.

In [None]:
schema = StructType([
    StructField('FL_DATE', StringType(), True),
    StructField('is_test_day', BooleanType(), True)
])
testdaystemp = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('gs://{}/flights/testdaytemp.csv'.format(BUCKET))

testdaystemp.createOrReplaceTempView('testdaystemp')

In [None]:
results = spark.sql('SELECT * FROM testdaystemp')
results.head(5)

<h3> Hexbin plot </h3>

Create a hexbin plot using Spark (repeat of what we did in BigQuery, except that we are now restricting to train days only).

In [None]:
df = flights[(flights['distance'] < 2000) & (flights['dep_delay'] > -20) & (flights['dep_delay'] < 30)]
df.describe().show()

In [None]:
df.head(5)

Sample the dataframe so that it fits into memory (not a problem in development, but will be on full dataset); then plot it.

In [None]:
pdf = df.sample(False, 0.02, 20).toPandas()  # to 100,000 rows approx on complete dataset
g = sns.jointplot(x=pdf['distance'], y=pdf['dep_delay'], kind="hex", height=10, joint_kws={'gridsize':20})

<h3> Quantization </h3>

Now find the quantiles

In [None]:
distthresh = flights.approxQuantile('distance', list(np.arange(0, 1.0, 0.2)), 0.02)
distthresh[-1] = float('inf')
distthresh

In [None]:
delaythresh = flights.approxQuantile('dep_delay', list(np.arange(0, 1.0, 0.2)), 0.05)
delaythresh[-1] = float('inf')
delaythresh

## Do Bayes in each bin

In [None]:
import pyspark.sql.functions as F
import pandas as pd

df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime'])
for m in range(0, 2): # len(distthresh)-1):
    for n in range(0, len(delaythresh)-1):
        bdf = flights[(flights['distance'] >= distthresh[m])
             & (flights['distance'] < distthresh[m+1])
             & (flights['dep_delay'] >= delaythresh[n])
             & (flights['dep_delay'] < delaythresh[n+1])]
        ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0]
        print (m, n, ontime_frac)
        df = df.append({
            'dist_thresh': distthresh[m], 
            'delay_thresh': delaythresh[n],
            'frac_ontime': ontime_frac
        }, ignore_index=True)

We get close to the 70% threshold only on the last bin.
Let's fine tune the delay thresh around the decision boundary.
Which we know is on the order of 15 minutes.

In [None]:
delaythresh = range(10, 20)

df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime'])
for m in range(0, len(distthresh)-1):
    for n in range(0, len(delaythresh)-1):
        bdf = flights[(flights['distance'] >= distthresh[m])
             & (flights['distance'] < distthresh[m+1])
             & (flights['dep_delay'] >= delaythresh[n])
             & (flights['dep_delay'] < delaythresh[n+1])]
        ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0]
        print (m, n, ontime_frac)
        df = df.append({
            'dist_thresh': distthresh[m], 
            'delay_thresh': delaythresh[n],
            'frac_ontime': ontime_frac
        }, ignore_index=True)

In [None]:
df['score'] = abs(df['frac_ontime'] - 0.7)
bayes = df.sort_values(['score']).groupby('dist_thresh').head(1).sort_values('dist_thresh')
bayes

In [None]:
bayes.to_csv('gs://${BUCKET}/flights/bayes.csv'.format(BUCKET), index=False)
!gsutil cat gs://{BUCKET}/flights/bayes.csv

In [None]:
bayes[ bayes['dist_thresh'] == distthresh[1] ]['delay_thresh'].values[0]

## Evaluate model

Use the decision threshold to see how well model does on rule-based out-of-sample dataset flights

In [None]:
m = 1
statement = """
SELECT
  '{0:.0f}-{1:.0f} miles' AS bin,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel,
  COUNT(*) AS total_flights
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'False' AND
  f.distance >= {0:f} AND f.distance < {1:f}
""".format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] )
print(statement)
eval_flights = spark.sql(statement)
eval_flights.show()

In [None]:
distthresh[-1] = 100000 # bug in Spark SQL, which treats 'inf' as a column name
for m in range(0, len(distthresh)-1):
    statement = """
SELECT
  '{0:.0f}-{1:.0f} miles' AS bin,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel,
  COUNT(*) AS total_flights
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'False' AND
  f.distance >= {0:f} AND f.distance < {1:f}
""".format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] )
eval_flights = spark.sql(statement)
eval_flights.show()

Use the decision threshold to see how well model does on rule-based temporal test dataset flights

In [None]:
m = 1
statement = """
SELECT
  '{0:.0f}-{1:.0f} miles' AS bin,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel,
  COUNT(*) AS total_flights
FROM flights f
JOIN testdaystemp t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_test_day == 'True' AND
  f.distance >= {0:f} AND f.distance < {1:f}
""".format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] )
print(statement)
temp_eval_flights = spark.sql(statement)
temp_eval_flights.show()

In [None]:
distthresh[-1] = 100000 # bug in Spark SQL, which treats 'inf' as a column name
for m in range(0, len(distthresh)-1):
    statement = """
SELECT
  '{0:.0f}-{1:.0f} miles' AS bin,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel,
  COUNT(*) AS total_flights
FROM flights f
JOIN testdaystemp t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_test_day == 'True' AND
  f.distance >= {0:f} AND f.distance < {1:f}
""".format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] )
temp_eval_flights = spark.sql(statement)
temp_eval_flights.show()

<h2> Repeat, but on full dataset </h2>

You can launch the above processing on the full dataset from within JupyterLab if you want the statistics and graphs updated. I didn't, though, because this is not what I would have really done. Instead, 
I would have created a standalone Python script and submitted it to the cluster -- there is no need to put JupyterLab in the middle of a production process. We'll submit a standalone Pig program to the cluster in the next section.

Steps:
<ol>
<li> Change the input variable to process all-flights-* </li>
<li> Increase cluster size (bash increase_cluster.sh from CloudShell) </li>
<li> Clear all Outputs from this notebook </li>
<li> Restart Kernel and Run all cells </li>
<li> Decrease cluster size (bash decrease_cluster.sh from CloudShell) </li>
</ol>


Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.