In [None]:
from datetime import datetime, timedelta

# pandas and plotting libraries for visualizations
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# module containing functions for manipulation pyspark dataframes
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window
from pyspark.sql import DataFrame

# class which will let us create spark objects
from pyspark.sql import SparkSession

# modeling
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# helper functions for the class
from helpers import display, read_df, write_df, feature_importances

## [PySpark ML docs](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html)

## Create a Spark Session

In [None]:
spark = (
    SparkSession
    .builder
    .appName('model')
    .master('local[2]')
    .getOrCreate()
)

## Feature generation function

In [None]:
def build_features(raw_df):
    avg_miles_by_census_tract = (
        raw_df
        .groupby('dropoff_census_tract')
        .agg(f.avg(f.col('trip_miles')).alias('avg_trip_miles_by_dropoff_census_tract'))
    )

    features = (
        raw_df
        .where(f.col('tips').isNotNull())
        .join(avg_miles_by_census_tract, on='dropoff_census_tract')
        .select(
            'trip_id',
            'company',
            'trip_miles',
            'fare',
            f.month('start_time').alias('start_month'),
            f.dayofweek('start_time').alias('start_day_of_week'),
            f.hour('start_time').alias('start_hour'),
            (
                f.unix_timestamp(f.col('end_time'))
                - f.unix_timestamp(f.col('start_time'))
            ).alias('trip_minutes'),
            'avg_trip_miles_by_dropoff_census_tract',
            f.col('tips').alias('label'),
        )
    )
    
    return features

## Build Pipeline

In [None]:
df = read_df(spark, '../taxi_2016')

In [None]:
# only have tip data for credit card transactions
tips = df.where(f.col('payment_type') == 'Credit Card')

In [None]:
tips.count()

In [None]:
train_raw, test_raw = tips.randomSplit([0.7, 0.3])

In [None]:
train = build_features(train_raw)

In [None]:
test = build_features(test_raw)

In [None]:
# check for nulls in training set
total_rows = train.count()
display(
    train.agg(
        *[(total_rows - f.count(c)).alias(c) for c in train.columns]
    )
)

In [None]:
categorical_cols = ['start_month', 'start_day_of_week', 'start_hour', 'company']
indexed_categorical_cols = [c + '_ind' for c in categorical_cols]

numeric_cols = ['trip_miles', 'fare', 'trip_minutes', 'avg_trip_miles_by_dropoff_census_tract']
imputed_numeric_cols = [c + '_imp' for c in numeric_cols]

In [None]:
train = train.select('trip_id', *categorical_cols, *[f.col(c).cast(t.DoubleType()) for c in numeric_cols], 'label')
train = train.fillna('no_company')

In [None]:
test = test.select('trip_id', *categorical_cols, *[f.col(c).cast(t.DoubleType()) for c in numeric_cols], 'label')
test = test.fillna('no_company')

In [None]:
pipeline = Pipeline().setStages(
    [
        *[StringIndexer(inputCol=c, outputCol=c+'_ind', handleInvalid='keep') for c in categorical_cols],
        Imputer(inputCols=numeric_cols, outputCols=imputed_numeric_cols),
        VectorAssembler(inputCols=imputed_numeric_cols + indexed_categorical_cols, outputCol='features'),
        RandomForestRegressor(maxBins=60),
    ]
)

In [None]:
train.count()

In [None]:
%%time

model = pipeline.fit(train)

In [None]:
train.cache()
train.count()

In [None]:
%%time

model = pipeline.fit(train)

In [None]:
predictions = model.transform(test)

## Evaluate

In [None]:
evaluator = RegressionEvaluator(
    labelCol='label',
    predictionCol='prediction',
    metricName='rmse'
)
rmse = evaluator.evaluate(predictions)

In [None]:
print('RMSE on test data = {}'.format(rmse))

In [None]:
print('Most important features:')
feature_importances(predictions, model.stages[6], 20)

In [None]:
to_plot = (
    predictions
    .select('prediction', 'label')
    .sample(withReplacement=False, fraction=.01)
    .toPandas()
)

In [None]:
plt.figure()
sns.regplot(to_plot['prediction'], to_plot['label'])

In [None]:
plt.figure()
sns.distplot(to_plot['prediction'] - to_plot['label'])

In [None]:
spark.stop()