# Training and Validating a Machine Learning Model

Linear regression is the most commonly employed machine learning model since it is highly interpretable and well studied.  This is often the first pass for data scientists modeling continuous variables.  This notebook trains a multivariate regression model and interprets the results. This notebook is organized in two sections:

- Exercise 1: Training a Model
- Exercise 2: Validating a Model

Run the following cell to load common libraries.

In [0]:
import urllib.request
import os
import numpy as np
from pyspark.sql.types import * 
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf
import matplotlib
import matplotlib.pyplot as plt
print("Imported common libraries.")

## Load the training data

In this notebook, we will be using a subset of NYC Taxi & Limousine Commission - green taxi trip records available from [Azure Open Datasets]( https://azure.microsoft.com/en-us/services/open-datasets/). The data is enriched with holiday and weather data. Each row of the table represents a taxi ride that includes columns such as number of passengers, trip distance, datetime information, holiday and weather information, and the taxi fare for the trip.

Run the following cell to load the table into a Spark dataframe and reivew the dataframe.

In [0]:
dataset = spark.sql("select * from nyc_taxi")
display(dataset)

## Exercise 1: Training a Model

In this section we will use the Spark's machine learning library, `MLlib` to train a `NYC Taxi Fare Predictor` machine learning model. We will train a multivariate regression model to predict taxi fares in New York City based on input features such as, number of passengers, trip distance, datetime, holiday information and weather information. Before we start, let's review the three main abstractions that are provided in the `MLlib`:<br><br>

1. A **transformer** takes a DataFrame as an input and returns a new DataFrame with one or more columns appended to it.  
a. Transformers implement a **`.transform()`** method.
2. An **estimator** takes a DataFrame as an input and returns a model, which itself is a transformer.  
a. Estimators implements a **`.fit()`** method.
3. A **pipeline** combines together transformers and estimators to make it easier to combine multiple algorithms.  
a. Pipelines implement a **`.fit()`** method.
  
These basic building blocks form the machine learning process in Spark from featurization through model training and deployment.

### Featurization of the training data

Machine learning models are only as strong as the data they see and can only work on numerical data.  
**Featurization is the process of creating this input data for a model.**  
In this section we will build derived features and create a pipeline of featurization steps.

Run the following cell to engineer the cyclical features to represent `hour_of_day`. Also, we will drop rows with null values in the `totalAmount` column and convert the column ` isPaidTimeOff ` as integer type.

In [0]:
def get_sin_cosine(value, max_value):
  sine =  np.sin(value * (2.*np.pi/max_value))
  cosine = np.cos(value * (2.*np.pi/max_value))
  return (sine.tolist(), cosine.tolist())

# Third parameter in StructField specifies if the field can be nullable.
schema = StructType([
    StructField("sine", DoubleType(), False),
    StructField("cosine", DoubleType(), False)
])

get_sin_cosineUDF = udf(get_sin_cosine, schema)

# Create dataset with columns hour_sine and hour_cosine and drop the column hour_of_day
dataset = dataset.withColumn("udfResult", get_sin_cosineUDF(col("hour_of_day"), lit(24))).withColumn("hour_sine", col("udfResult.sine")).withColumn("hour_cosine", col("udfResult.cosine")).drop("udfResult").drop("hour_of_day")

dataset = dataset.filter(dataset.totalAmount.isNotNull())

#Cast field isPaidTimeOff to integer
dataset = dataset.withColumn("isPaidTimeOff", col("isPaidTimeOff").cast("integer"))

display(dataset)

Run the following cell to create stages in our featurization pipeline to scale the numerical features and to encode the categorical features.

###Important points to note###
•	**Imputer:** The imputer estimator is fit on the dataset to calculate the statistic for each column. The fit imputer is then applied to the dataset to create a copy of the dataset with all the missing values for each column replaced with the calculated mean statistic.

•	**VectorAssembler:** VectorAssembler is a transformer that combines a given list of columns into a single vector column. 
It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees

•	**MinMaxScaler:** Transform features by scaling each feature to a given range.
A way to normalize the input features/variables is the Min-Max scaler. By doing so, all features will be transformed into the range [0,1] meaning that the minimum and maximum value of a feature/variable is going to be 0 and 1, respectively.

•	**StringIndexer:**  encodes a string column of labels to a column of label indices. ... If the input column is numeric, we cast it to string and index the string values.

•	**Vector:** A vector is similar to an Array. A vector holds multiple number values. In Python, you can do operations on vectors using things like dot product and cross product, in linear algebra. These operations are used to efficiently manipulate data when creating neural networks and 3d rendering.

•	**OneHotEncoder:** One-hot encoding transforms the values in categoryIndex into a binary vector where at maximum one value may be 1. 
One hot encoding makes our training data more useful and expressive, and it can be rescaled easily. By using numeric values, we more easily determine a probability for our values. In particular, one hot encoding is used for our output values, since it provides more nuanced predictions than single labels.

•	**stages:** stages array holds the entities to be processed
**stages[0]:** --> imputer, convert all blank values in pessanger count with calculated mean statistic
**stages[1]:** --> assembler and scaler. 
                  assembler holds all numeric columns, and adds them to numeric_features
                  scaler: scales the numeric_features to scaled_numeric_features column
**stages[n]:** -->  for each category column
                    use stringindexer to create corresponding categorycol_index and
                    use onehotencoder to create corresponding _classVector

In [0]:
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

numerical_cols = ["passengerCount", "tripDistance", "snowDepth", "precipTime", "precipDepth", "temperature", "hour_sine", "hour_cosine"]
categorical_cols = ["day_of_week", "month_num", "normalizeHolidayName", "isPaidTimeOff"]
label_column = "totalAmount"

stages = []

inputCols = ["passengerCount"]
outputCols = ["passengerCount"]
imputer = Imputer(strategy="median", inputCols=inputCols, outputCols=outputCols)
stages += [imputer]

assembler = VectorAssembler().setInputCols(numerical_cols).setOutputCol('numerical_features')
scaler = MinMaxScaler(inputCol=assembler.getOutputCol(), outputCol="scaled_numerical_features")
stages += [assembler, scaler]

for categorical_col in categorical_cols:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index", handleInvalid="skip")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categorical_col + "_classVector"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
print("Created stages in our featurization pipeline to scale the numerical features and to encode the categorical features.")

Use a `VectorAssembler` to combine all the feature columns into a single vector column named **features**.

In [0]:
assemblerInputs = [c + "_classVector" for c in categorical_cols] + ["scaled_numerical_features"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
print("Used a VectorAssembler to combine all the feature columns into a single vector column named features.")

**Run the stages as a Pipeline**

The pipeline is itself is now an `estimator`.  Call the pipeline's `fit` method and then `transform` the original dataset. This puts the data through all of the feature transformations we described in a single call. Observe the new columns, especially column: **features**.

**Pipeline:** A Pipeline is specified as a sequence of stages. Each stage is either a Transformer or an Estimator . These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 

**Fit:** Fit function adjusts weights according to data values so that better accuracy can be achieved

**transform():** This method performs fit and transform on the input data at a single time and converts the data points. 
If we use fit and transform separate when we need both then it will decrease the efficiency of the model so we use fit_transform() which will do both the work.

* numerical_cols = ["passengerCount", "tripDistance", "snowDepth", "precipTime", "precipDepth", "temperature", "hour_sine", "hour_cosine"]  
* categorical_cols = ["day_of_week", "month_num", "normalizeHolidayName", "isPaidTimeOff"]

This will generate following
1. Columns numerical_Features and scaled_numerical_Features columns, which will contain the vector containing all numeric columns
2. A column featurename_index and featurename_classVector for each category column
3. Features column containing all category_cols

Sparse vectors are when you have a lot of values in the vector as zero. 
While a dense vector is when most of the values in the vector are non zero.

**Dense vs sparse vectors**  
MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values.  
For example, a vector (1.0, 0.0, 3.0) can be represented  
* in dense format as [1.0, 0.0, 3.0] or  
* in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

In [0]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

display(preppedDataDF)

### Train a multivariate regression model

A multivariate regression takes an arbitrary number of input features. The equation for multivariate regression looks like the following where each feature `p` has its own coefficient:

&nbsp;&nbsp;&nbsp;&nbsp;`Y ≈ β<sub>0</sub> + β<sub>1</sub>X<sub>1</sub> + β<sub>2</sub>X<sub>2</sub> + ... + β<sub>p</sub>X<sub>p</sub>`

Split the featurized training data for training and validating the model

In [0]:
(trainingData, testData) = preppedDataDF.randomSplit([0.7, 0.3], seed=97)
print("The training data is split for training and validating the model: 70-30 split.")

Create the estimator `LinearRegression` and call its `fit` method to get back the trained ML model (`lrModel`). You can read more about [Linear Regression] from the [classification and regression] section of MLlib Programming Guide.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Linear Regression]: https://spark.apache.org/docs/3.1.1/ml-classification-regression.html#linear-regression

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol=label_column)

lrModel = lr.fit(trainingData)

print(lrModel)

## Exercise 2: Validating a Model

-sandbox

From the trained model summary, let’s review some of the model performance metrics such as, Root Mean Squared Error (RMSE), Mean Absolute Error (MAE), and R<sup>2</sup> score. We will also look at the multivariate model’s coefficients.

##Important Terms##
**Root Mean Square Error (RMSE):** Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are

**Mean absolute error (MAE):** Mean absolute error (MAE) is a measure of errors between paired observations expressing the same phenomenon. Examples of Y versus X include comparisons of predicted versus observed, subsequent time versus initial time, and one technique of measurement versus an alternative technique of measurement.

**R-square (R<sup>2</sup>):** The most common interpretation of r-squared is how well the regression model fits the observed data. For example, an r-squared of 60% reveals that 60% of the data fit the regression model. Generally, a higher r-squared indicates a better fit for the model.

In [0]:
summary = lrModel.summary
print("RMSE score: {} \nMAE score: {} \nR2 score: {}".format(summary.rootMeanSquaredError, summary.meanAbsoluteError, lrModel.summary.r2))
print("")
print("β0 (intercept): {}".format(lrModel.intercept))
i = 0
for coef in lrModel.coefficients:
  i += 1
  print("β{} (coefficient): {}".format(i, coef))

Evaluate the model performance using the hold-back  dataset. Observe that the RMSE and R<sup>2</sup> score on holdback dataset is slightly degraded compared to the training summary. A big disparity in performance metrics between training and hold-back dataset can be an indication of model overfitting the training data.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lrModel.transform(testData)
evaluator = RegressionEvaluator(
    labelCol=label_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
evaluator = RegressionEvaluator(
    labelCol=label_column, predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print("MAE on test data = %g" % mae)
evaluator = RegressionEvaluator(
    labelCol=label_column, predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

**Compare the summary statistics between the true values and the model predictions**

In [0]:
display(predictions.select(["totalAmount",  "prediction"]).describe())

**Visualize the plot between true values and the model predictions**

In [0]:
p_df = predictions.select(["totalAmount",  "prediction"]).toPandas()
true_value = p_df.totalAmount
predicted_value = p_df.prediction

plt.figure(figsize=(10,10))
plt.scatter(true_value, predicted_value, c='crimson')
plt.yscale('log')
plt.xscale('log')

p1 = max(max(predicted_value), max(true_value))
p2 = min(min(predicted_value), min(true_value))
plt.plot([p1, p2], [p1, p2], 'b-')
plt.xlabel('True Values', fontsize=15)
plt.ylabel('Predictions', fontsize=15)
plt.axis('equal')
plt.show()