# Part 5 - Machine Learning

Finally we want to apply some machine learning to the taxi data. We will try to build a model for answering the following question:

    As a Taxi driver, what time and location would be best on a specific day to make most money.
    
This is a relevant question in order to help maximizing a Taxi drivers income. We will try to build a model which predicts the total fare amount for each date and hour and each (grid) location. This way the taxi driver can predict the expected total amount of money being made at a specific point in time at a specific location, so he can better plan his shifts.

Of course given the data that we currently have, we cannot precisely give an answer, because even locations with a low total fare amount may be attractive, as long as there are only very few taxi cabs. But we do not know how many taxi cabs were simply waiting or were driving around without passengers.

In [None]:
dwh_basedir = "file:///srv/jupyter/nyc-dwh"
integrated_basedir = dwh_basedir + "/integrated"

# 0 Setup Environment

Before we begin, we create a local Spark session

## 0.1 Spark Session

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

if not 'spark' in locals():
    spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory","64G") \
        .getOrCreate()

spark

## 0.2 Matplotlib

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

# 1 Read Taxi Data

Now we can read in the taxi data from the structured zone.

In [None]:
hourly_taxi_trips = spark.read.parquet(integrated_basedir + "/taxi-trips-hourly")
hourly_taxi_trips.limit(10).toPandas()

In [None]:
hourly_taxi_trips.printSchema()

# 2. Simple Model

## 2.1 Split Training and Validation set

As a first step, we split up the whole data set into a training and a validation data set. Typical data sets are split randomly, but for time series data sets a non-random split is preferrable in order to avoid an undesired information creep from future observations. Therefore we create a split filtering by date, such that about 80% of records are used for training and the remaining 20% of all records will be used for validation.

In [None]:
import datetime

training_fraction = 0.8
validation_fraction = 1 - training_fraction
split_date = datetime.date(2013, 1, 1) + datetime.timedelta(days=training_fraction*(365))
print("split_date=\"" + str(split_date) + "\"")

# Pickup all records with a date lower than the split date
training_data = # YOUR CODE HERE
# Pickup all records with a date grater or equal than the split date
validation_data = # YOUR CODE HERE

# Count the number of records for both the training and the validation data set
training_data_count = # YOUR CODE HERE
validation_data_count = # YOUR CODE HERE

print("training_data count = " + str(training_data_count))
print("validation_data count = " + str(validation_data_count))

## 2.2 Features

As a first step we need to create so called *features* from the training data set. Most PySpark ML algorithms expect two specific input columns: A so called *label* column containing the true value and a so called *features* column containing a vector of all variables used for prediction. 

The label column has to be a simple numeric value, in our case it will be the *total amount*.  The features column needs to contain the special data type *vector*, which is constructed from various attributes of our observations. Some of these attributes can be taken directly from our training data set, while other columns also need to be derived from the original values.

### Feature Engineering Building Blocks
PySpark provides lots of different feature engineering algorithms as building blocks. These building blocks are simple (or complex) transformations which typically will add new derived columns to a data frame. We will see how we can chain multiple of these components together into a so called *pipeline* later.

In [None]:
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml import Pipeline

### SQL Transformer
PySpark provides a very generic building block for transformation which simply executes some SQL. For example for creating a combined geo location inside a single column, we can use the following SQLTransformer

In [None]:
geo_location_transformer = # YOUR CODE HERE

training_data_1 = geo_location_transformer.transform(training_data)
training_data_1.limit(10).toPandas()

### One Hot Encoding
One important case where the original values cannot be used directly is categorial data. For example the geo location cannot be used as a numerical value. Therefore we need a transformation which creates numerical values from this categorial feature. PySpark provides the pair of a *string indexer* followed by *one hot encoding* to create a separate multidimensional vector for each categorial variable. The pattern is always the same:

```
categorial data => StringIndexer => OneHotEncoder => vector
```

Specifically the code for the geo location looks as follows:

In [None]:
# First create an index into all geo locations
geo_indexer = # YOUR CODE HERE
geo_index_model = # YOUR CODE HERE
training_data_2 = # YOUR CODE HERE

# Display some records
training_data_2.limit(10).toPandas()

In [None]:
# Now one-hot encode the generated index value
geo_encoder = # YOUR CODE HERE
geo_encoder_model = # YOUR CODE HERE
training_data_3 = # YOUR CODE HERE

# Display some records
training_data_2.limit(10).toPandas()

### Vector Assembler

As already noted at the beginning of this section, PySpark requires all features to be available in a single column. This can be achieved by using a *vector assembler*, which will glue together all specified numerical and vector columns into a single vector column:

In [None]:
assembler = # YOUR CODE HERE

training_data_4 = # YOUR CODE HERE

training_data_4.limit(10).toPandas()

### Pipeline

Now we have met all relevant building blocks. Instead of manually chaining these transformations together, you always should use a *pipeline*, where you can simply specify all transformations to apply. The pipeline also takes care of performing any `fit` phase of some transformers (like `StringIndexer` or `OneHotEncoderEstimator`).

In [None]:
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml import Pipeline

feature_pipeline = Pipeline(
    stages = [
        SQLTransformer(
            statement="""
                SELECT
                    total_amount,
                    date,
                    hour,
                    daily_temperature,
                    hourly_temperature,
                    daily_precipitation,
                    hourly_precipitation,
                    daily_wind_speed,
                    hourly_wind_speed,
                    month(`date`) - 1 AS `month_idx`,
                    dayofweek(`date`) - 1 AS `weekday_idx`,
                    CASE
                        WHEN lat_idx IS NULL OR lat_idx < 0 THEN NULL
                        WHEN long_idx IS NULL  OR long_idx < 0 THEN NULL
                        ELSE concat(lat_idx, "/", long_idx) 
                    END AS geo_location,
                    CASE WHEN
                        bank_holiday = true THEN 1
                        ELSE 0
                    END AS bank_holiday
                FROM __THIS__
            """
        ),
        # Create one hot encoded geo location via StringIndexer and OneHotEncoderEstimator
        # YOUR CODE HERE

        # Create one hot encoded hour via OneHotEncoderEstimator
        # YOUR CODE HERE

        # Create one hot encoded weekday via OneHotEncoderEstimator
        # YOUR CODE HERE

        # Assembler the following columns
        #  - one hot encoded weekday
        #  - one hot encoded hour
        #  - bank holiday
        #  - one hot encoded geo location
        #  - daily temperature
        #  - hourly temperature
        #  - daily precipitation
        #  - hourly precipitation
        #  - daily wind speed
        #  - hourly wind speed
        VectorAssembler(
            handleInvalid="skip",
            inputCols=[
                # YOUR CODE HERE
            ],
            outputCol='features'
        )
    ]
)

In [None]:
feature_model = # YOUR CODE HERE

The feature model now contains all stages and can be used as a transformer. Let's transform and display the training data as a quick test.

In [None]:
features_training_data = # YOUR CODE HERE
features_training_data.limit(10).toPandas()

Since we specified `skip` as the strategy to handle invalid records in the vector assembler, let us check how many records where dropped from the training data.

In [None]:
features_data_count = feature_model.transform(training_data).count()

print("feature_data_count = " + str(features_data_count))
print("training_data_count = " + str(training_data_count))
print("skipped records = " + str(training_data_count - features_data_count))

## 2.3 Model

So far we have only prepared the data by adding a feature column containg some information which should be used as independant variables for prediction. Now we finally want to train a model which makes use of these features and predict the total amount for every date, hour and geo location.

We create another pipeline, which contains the feature pipeline as its first entry and a simple linear regression as its second stage.

In [None]:
pred_pipeline = Pipeline(stages=[
    feature_pipeline,
    # YOUR CODE HERE
])

pred_model = # YOUR CODE HERE

## 2.4 Prediction

The `pred_model` now contains all feature transformation steps of the feature pipeline and a linear model. We can directly use this model for performing predictions of the total amount.

Now we use the validation data for prediction, which we set aside at the beginning and which could not influence the training phase in any way.

In [None]:
pred_validation_data = # YOUR CODE HERE

We now want to display some columns of the predicted values

In [None]:
pred_validation_data\
    .orderBy("date", "hour") \
    .select(
        "date", "hour",
        "geo_location",
        "total_amount",
        "pred_total_amount"
    ) \
    .limit(10)\
    .toPandas()

## 2.5 Validation

When looking at the predictions and comparing them with the true total amounts, we already suspect that our model does not perform very well. Now we want to quantify the goodness of fit using an appropriate evaluator. Note that evaluation should always be performed with the validation data, since we are not interested very much into the question how well a model describes the training data, but instead we want to understand how well the model performs with new data, which was not used during the training phase.

In [None]:
from pyspark.ml.evaluation import *

evaluator = # YOUR CODE HERE

rmse = # YOUR CODE HERE

print("rmse = " + str(rmse))
print("real_avg = " + str(validation_data.select(f.avg("total_amount")).first()[0]))

Apparently our model is not very good, since its RMSE is about twice as large as the average value.

## 2.6 Baseline Model

We already saw that the model is not very good. But what can we expect? It is always helpful to come up with a very simple base line model, and every true model should better beat the base line model. In our case, we simply use the average total amount as a constant base line model.

In [None]:
avg_total_amount = # YOUR CODE HERE
baseline_validation_data = # YOUR CODE HERE

Using the base line model, we can now calculate the RMSE of this model as a baseline.

In [None]:
rmse = evaluator.evaluate(baseline_validation_data)

print("rmse = " + str(rmse))
print("real_avg = " + str(validation_data.select(f.avg("total_amount")).first()[0]))

# 3. Improve Model

## 3.1 Integrate information from the past

Depending on the scenario, it can be completely legal to use data from the past as an additional feature. In this example, we assume that we can use the total amount for exactly the same hour of day and for the same location from exactly one week ago as an additional feature. This implies that we assume that this number is available at the time when new predictions are made.

In other scenarios, the minimum amount of time to go back into the past, may be much larger or smaller. The importat question here is always what data is available when a new prediciton is performed.

One important aspect of this approach is that no data may be available for some locations. But since our algorithms always require that all observations contain valid numbers, we fill the overall average of the metric over all locations for a specific date and hour.

In [None]:
# Calculate overall average per date and hour to fill in missing values
hourly_taxi_trips_avg = hourly_taxi_trips \
    .groupBy("date", "hour") \
    .agg(
        f.avg("fare_amount").alias("fare_amount"),
        f.avg("tip_amount").alias("tip_amount"),
        f.avg("total_amount").alias("total_amount")
    )

# Extend the incoming records by the total amount for the same location seven days ago. If no data is available for the specific location,
# we use the overall average instead. This will require three steps:
#  1. Join hourly taxi trips against itself, but delayed by seven days
#  2. Join average values delayed by seven days
#  3. Pick ether the value of the location or otherwise the average
#
hourly_taxi_trips_ext = hourly_taxi_trips \
    .alias("now") \
    .join(
        # Specify the data frame for self join and provide an alias
        hourly_taxi_trips.alias("last_week"), 
        # The join should be performed using date and hour and the geo location
        (f.date_sub(f.col("now.date"), 7) == f.col("last_week.date")) &
        (f.col("now.hour") == f.col("last_week.hour")) &
        (f.col("now.lat_idx") == f.col("last_week.lat_idx")) &
        (f.col("now.long_idx") == f.col("last_week.long_idx")),
        how="leftOuter"
    )\
    .join(
        # Specify the average data frame
        hourly_taxi_trips_avg.alias("avg"),
        # The join should be performed on date and hour only
        (f.date_sub(f.col("now.date"), 7) == f.col("avg.date")) &
        (f.col("now.hour") == f.col("avg.hour")),
        how="leftOuter"
    )\
    .select(
        f.col("now.*"),
        f.coalesce(f.col("last_week.fare_amount"), f.col("avg.fare_amount")).alias("prev_fare_amount"),
        f.coalesce(f.col("last_week.tip_amount"), f.col("avg.tip_amount")).alias("prev_tip_amount"),
        f.coalesce(f.col("last_week.total_amount"), f.col("avg.total_amount")).alias("prev_total_amount")
    )\
    .filter(f.col("prev_fare_amount").isNotNull())

In [None]:
hourly_taxi_trips_ext.limit(10).toPandas()

## 3.2 Split Training and Validation set

As a first step, we split up the whole data set into a training and a validation data set. Typical data sets are split randomly, but for time series data sets a non-random split is preferrable in order to avoid an undesired information creep from future observations. Therefore we create a split filtering by date, such that about 80% of records are used for training and the remaining 20% of all records will be used for validation.

In [None]:
import datetime

training_fraction = 0.8
validation_fraction = 1 - training_fraction
split_date = datetime.date(2013, 1, 7) + datetime.timedelta(days=training_fraction*(365-7))
print("split_date=\"" + str(split_date) + "\"")

training_data = hourly_taxi_trips_ext.filter(f.col("date") < split_date)
validation_data = hourly_taxi_trips_ext.filter(f.col("date") >= split_date)

training_data_count = training_data.count()
validation_data_count = validation_data.count()

print("training_data count = " + str(training_data_count))
print("validation_data count = " + str(validation_data_count))

## 3.3 Create Features and Train Model

Using building blocks of the PySpark ML package, we create a machine learning pipeline with all feature engineering steps and the regression.

### Bucketing

For some numerical features (like temperature and wind speed), it may be more appropriate to model them as categorical features. This can be done by *bucketing* as follows:

In [None]:
# Create Buckets
bucketizer = # YOUR CODE HERE
training_data_1 = # YOUR CODE HERE

training_data_1.limit(10).toPandas()

In [None]:
# One Hot encode buckets
encoder = OneHotEncoderEstimator(
    inputCols=["daily_temperature_bucket"],
    outputCols=["daily_temperature_onehot"]
)
encoder_model = encoder.fit(training_data_1)
training_data_2 = encoder_model.transform(training_data_1)

training_data_2.limit(10).toPandas()

### Pipeline

Now we can create a more extensive pipeline, which makes use of more features and which also performs bucketing of the weather data.

In particulat the pipeline performs the following steps:
* one hot encode geo location
* one hot encode hour
* one hot encode day of week
* bucketize all weather measurements
* perform regression
* truncate predictions to zero from below

In [None]:
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages = [
        SQLTransformer(
            statement="""
                SELECT
                    total_amount,
                    prev_total_amount,
                    date,
                    hour,
                    daily_temperature,
                    hourly_temperature,
                    daily_precipitation,
                    hourly_precipitation,
                    daily_wind_speed,
                    hourly_wind_speed,
                    CASE WHEN
                        bank_holiday = true THEN 0
                        ELSE dayofweek(`date`)
                    END AS weekday_idx,
                    CASE
                        WHEN lat_idx IS NULL OR lat_idx < 0 THEN NULL
                        WHEN long_idx IS NULL  OR long_idx < 0 THEN NULL
                        ELSE concat(lat_idx, "/", long_idx) 
                    END AS geo_location
                FROM __THIS__
            """
        ),
        StringIndexer(
            inputCol="geo_location",
            outputCol="geo_location_idx",
            handleInvalid="keep"
        ),
        OneHotEncoderEstimator(
            inputCols=["geo_location_idx"],
            outputCols=["geo_location_onehot"]
        ),
        OneHotEncoderEstimator(
            inputCols=["hour"],
            outputCols=["hour_onehot"]
        ),
        OneHotEncoderEstimator(
            inputCols=["weekday_idx"],
            outputCols=["weekday_onehot"]
        ),
        
        Bucketizer(
            inputCol="daily_temperature",
            outputCol="daily_temperature_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),-10,0,10,20,30,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["daily_temperature_bucket"],
            outputCols=["daily_temperature_onehot"]
        ),
        Bucketizer(
            inputCol="hourly_temperature",
            outputCol="hourly_temperature_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),-10,0,10,20,30,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["hourly_temperature_bucket"],
            outputCols=["hourly_temperature_onehot"]
        ),
        Bucketizer(
            inputCol="daily_precipitation",
            outputCol="daily_precipitation_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),0,100,200,300,400,500,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["daily_precipitation_bucket"],
            outputCols=["daily_precipitation_onehot"]
        ),
        Bucketizer(
            inputCol="hourly_precipitation",
            outputCol="hourly_precipitation_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),0,50,100,150,200,250,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["hourly_precipitation_bucket"],
            outputCols=["hourly_precipitation_onehot"]
        ),
        Bucketizer(
            inputCol="daily_wind_speed",
            outputCol="daily_wind_speed_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),0,1,2,3,4,5,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["daily_wind_speed_bucket"],
            outputCols=["daily_wind_speed_onehot"]
        ),
        Bucketizer(
            inputCol="hourly_wind_speed",
            outputCol="hourly_wind_speed_bucket",
            handleInvalid="keep",
            splits=[-float("inf"),0,1,2,3,4,5,float("inf")]
        ),
        OneHotEncoderEstimator(
            inputCols=["hourly_wind_speed_bucket"],
            outputCols=["hourly_wind_speed_onehot"]
        ),
        
        # Linear Prediction
        VectorAssembler(
            handleInvalid="skip",
            inputCols=[
                'prev_total_amount',
                'weekday_onehot',
                'hour_onehot',
                'geo_location_onehot',
                'daily_temperature_onehot',
                'hourly_temperature_onehot',
                'daily_precipitation_onehot',
                'hourly_precipitation_onehot',
                'daily_wind_speed_onehot',
                'hourly_wind_speed_onehot'
            ],
            outputCol='features'
        ),
        LinearRegression(
            featuresCol="features",
            labelCol="total_amount",
            predictionCol="pred_total_amount"
        ),
        SQLTransformer(
            statement="""
                SELECT
                    date,
                    hour,
                    geo_location,
                    total_amount,
                    CASE 
                        WHEN pred_total_amount > 0 THEN pred_total_amount
                        ELSE 0
                    END AS pred_total_amount
                FROM __THIS__
            """
        )
    ]
)

pred_model = pipeline.fit(training_data)

Again let us check how many training records have been dropped during the pipeline.

In [None]:
features_data_count = pred_model.transform(training_data).count()

print("training_data_count = " + str(training_data_count))
print("feature_data_count = " + str(features_data_count))
print("skipped records = " + str(training_data_count - features_data_count))

## 3.4 Prediction

In [None]:
pred_validation_data = pred_model.transform(validation_data)
pred_validation_data\
    .orderBy("date", "hour") \
    .select(
        "date", "hour",
        "geo_location",
        "total_amount",
        "pred_total_amount"
    ) \
    .limit(10)\
    .toPandas()

## 3.6 Validation

In [None]:
from pyspark.ml.evaluation import *

evaluator = RegressionEvaluator(
    labelCol = "total_amount",
    predictionCol = "pred_total_amount",
    metricName = "rmse"
)

rmse = # YOUR CODE HERE

print("rmse = " + str(rmse))
print("real_avg = " + str(validation_data.select(f.avg("total_amount")).first()[0]))

## 3.7 Baseline Model

In order to make sense of the number, we use a simple base line model as comparison again. This time we simply predict the total amount by using the previous total amount from seven days ago.

In [None]:
baseline_validation_data = # YOUR CODE HERE

rmse = # YOUR CODE HERE

print("rmse = " + str(rmse))
print("real_avg = " + str(validation_data.select(f.avg("total_amount")).first()[0]))

# 4. Best Time and Location

Although the predicted values are not really satisfying so far, they are good enough for deciding when to make most money. In order to underline this claim, let us compare the top ten hours and locations.

In [None]:
pred_validation_data.filter("date='2013-11-01'") \
    .select("date", "hour", "geo_location", "total_amount", "pred_total_amount") \
    .orderBy(f.desc("total_amount")) \
    .limit(10).toPandas()

In [None]:
pred_validation_data.filter("date='2013-11-01'") \
    .select("date", "hour", "geo_location", "total_amount", "pred_total_amount") \
    .orderBy(f.desc("pred_total_amount")) \
    .limit(10).toPandas()

## 4.1 Top 10 Recommendations

Let us find out how good the recommendatations of our algorithm would be. We pick the ten best hour-locations for each day from the real data, the predicted data and the baseline model. For each selection, we also compute the *real total revenue* for all these location-hours.

### Real 10 best location-hours

In [None]:
from pyspark.sql import Window

real_best_locations = pred_validation_data \
    .select(
        f.col("date"),
        f.col("hour"),
        f.col("total_amount"),
        f.col("geo_location"),
        f.row_number().over(Window.partitionBy("date").orderBy(f.col("total_amount").desc())).alias("row_number")
    ) \
    .filter(f.col("row_number") < 10)

real_best_locations.limit(10).toPandas()

In [None]:
real_top10_totals = # YOUR CODE HERE

print("real_top10_totals = " + str(real_top10_totals))

### Predicted 10 best location-hours

In [None]:
pred_best_locations = pred_validation_data \
    .select(
        f.col("date"),
        f.col("hour"),
        f.col("total_amount"),
        f.col("geo_location"),
        f.row_number().over(Window.partitionBy("date").orderBy(f.col("pred_total_amount").desc())).alias("row_number")
    ) \
    .filter(f.col("row_number") < 10)

pred_best_locations.limit(10).toPandas()

In [None]:
pred_top10_totals = pred_best_locations.select(
    f.sum(f.col("total_amount"))
).first()[0] 

print("pred_top10_totals = " + str(pred_top10_totals))

### Baseline 10 best location-hours

In [None]:
baseline_best_locations = validation_data \
    .select(
        f.col("date"),
        f.col("hour"),
        f.col("total_amount"),
        f.concat(f.col("lat_idx"), f.lit("/"), f.col("long_idx")).alias("geo_location"),
        f.row_number().over(Window.partitionBy("date").orderBy(f.col("prev_total_amount").desc())).alias("row_number")
    ) \
    .filter(f.col("row_number") < 10)

baseline_best_locations.limit(10).toPandas()

In [None]:
baseline_top10_totals = baseline_best_locations.select(
    f.sum(f.col("total_amount"))
).first()[0] 

print("baseline_top10_totals = " + str(baseline_top10_totals))