## Using Cross Validation

In this exercise, you will use cross-validation to optimize parameters for a regression model. cross-validation is an approach
Where instead of just splitting the data into two sets (training and test data), we pick a number which we call K and we make a
K number of folds in the data.

### Why cross-validation: 
Where because you're only using one training set and one validation set, You could still end up over fitting your model that might not always produce the optimal model with the optimal parameters

### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [0]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

IS_DB = True

### TODO 0: Run the code in PySpark CLI
1. Set the following to True:
```
PYSPARK_CLI = True
```
1. You need to generate py (Python) file: File > Export > Source File
1. Run it at your Hadoop/Spark cluster:
```
$ spark-submit Python_Regression_Cross_Validation.py
```

In [0]:
PYSPARK_CLI = False
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [0]:
# DataFrame Schema, that should be a table
tripSchema = StructType([
  StructField('TripID', StringType()),
  StructField('TripStartTimestamp', StringType()),
  StructField('TripEndTimestamp', StringType()),
  StructField('TripSeconds', IntegerType()),
  StructField('TripMiles', FloatType()),
  StructField('PickupCommunityArea', StringType()),
  StructField('DropoffCommunityArea', StringType()),
  StructField('Fare', FloatType()),
  StructField('PickupCentroidLatitude', StringType()),
  StructField('PickupCentroidLongitude', StringType()),
  StructField('DropoffCentroidLatitude', StringType()),
  StructField('DropoffCentroidLongitude', StringType()),
  StructField('AvgCostMile', FloatType()),
  StructField('DayofWeek', FloatType())
  ])

## TODO 1. Locate the flights.csv file

In [0]:
# File location and type
file_location = "/FileStore/tables/trips_sample.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .schema(tripSchema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
  
display(df)

TripID,TripStartTimestamp,TripEndTimestamp,TripSeconds,TripMiles,PickupCommunityArea,DropoffCommunityArea,Fare,PickupCentroidLatitude,PickupCentroidLongitude,DropoffCentroidLatitude,DropoffCentroidLongitude,AvgCostMile,DayofWeek
327085ef22ce3a6dbdd8667e28a8726438dbf9a2,2019-09-21 23:45:00,9/22/2019,1096,6.5,8.0,33.0,7.5,41.9,-87.65,41.86,-87.62,1.15,7.0
32790e128b1be1166fc3f9da68baefa6d292681a,2019-09-21 23:45:00,9/22/2019,640,4.1,8.0,22.0,7.5,41.89,-87.63,41.92,-87.68,1.83,7.0
3281df2831f6a0f78d1634ae0612c9a7f4f9a756,2019-09-21 23:45:00,9/22/2019,493,1.7,1.0,2.0,5.0,42.0,-87.67,41.99,-87.69,2.94,7.0
32942cf8482550b80e0f3e255876662246cd75f6,2019-09-21 23:45:00,9/22/2019,622,2.3,24.0,7.0,10.0,41.91,-87.68,41.92,-87.65,4.35,7.0
329511bbc7f0f368c452f3af13c28948546633df,2019-09-21 23:45:00,9/22/2019,582,2.6,8.0,24.0,7.5,41.89,-87.63,41.91,-87.68,2.88,7.0
32a13fc80c9bf0b5df8202cdc7742854152a08b9,2019-09-21 23:45:00,9/22/2019,905,3.6,3.0,1.0,10.0,41.97,-87.65,42.01,-87.67,2.78,7.0
32b5370376298ada0d956b4f67dcdd94b26b191c,2019-09-21 23:45:00,9/22/2019,1940,10.5,7.0,25.0,15.0,41.92,-87.65,41.89,-87.76,1.43,7.0
32b58bdbb76e133f1776fe7f774b4b7a340de66a,2019-09-21 23:45:00,9/22/2019,2273,24.9,28.0,,30.0,41.87,-87.66,,,1.2,7.0
32c2ed3679584b2cd478e3e494a3f931358f70d2,2019-09-21 23:45:00,9/22/2019,1047,4.9,24.0,4.0,10.0,41.91,-87.68,41.98,-87.68,2.04,7.0
32d1788e93014718dc588ec5855e3ae944885824,2019-09-21 23:45:00,9/22/2019,815,2.8,8.0,32.0,7.5,41.9,-87.65,41.88,-87.62,2.68,7.0


## TODO 2: Create a temporary view of the dataframe df

In [0]:
# Create a view or table
temp_table_name = "trips_csv"
df.createOrReplaceTempView(temp_table_name)

In [0]:
if PYSPARK_CLI:
    csv = spark.read.csv('trips_sample.csv"', inferSchema=True, header=True)
else:
    csv = spark.sql("SELECT * FROM trips_csv")


csv.show(5)

In [0]:
# Select features and label
data = csv.select("TripSeconds", "TripMiles", col("Fare").alias("label"))

# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

### Define the Pipeline
Now define a pipeline that creates a feature vector and trains a regression model

In [0]:
# Define the pipeline
assembler = VectorAssembler(inputCols = ["TripSeconds", "TripMiles"], outputCol="features")
# minMax Scale; number vector is normalized: 04/20/2021
minMax = MinMaxScaler(inputCol = assembler.getOutputCol(), outputCol="normFeatures")

#lr = LogisticRegression(labelCol="label", featuresCol="features")
#lr = LinearRegression(labelCol="label", featuresCol="normFeatures")

rf = RandomForestRegressor(labelCol="label", featuresCol="normFeatures")

#pipeline = Pipeline(stages=[assembler, lr])
pipeline = Pipeline(stages=[assembler, minMax, rf])

### Tune Parameters
You can tune parameters to find the best model for your data. To do this you can use the  **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple *folds* of the data split into training and validation datasets, in order to find the best performing parameters. Note that this can take a long time to run because every parameter combination is tried multiple times.

In [0]:
paramGrid = ParamGridBuilder().addGrid(rf.regParam, [0.3, 0.01]).addGrid(rf.maxIter, [10, 5]).build()


# TODO: K = 2, you may test it with 5, 10
# K=2, 3, 5, 10: Root Mean Square Error (RMSE): 13.2
K = 2
#cv = CrossValidator(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=K)

#CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,evaluator=RegressionEvaluator(),numFolds=3)
model = cv.fit(train)

### Test the Model
Now you're ready to apply the model to the test data.

In [0]:
prediction = model.transform(test)
#predicted = prediction.select("features", "prediction", "trueLabel")
predicted = prediction.select("normFeatures", "prediction", "trueLabel")
predicted.show()

### Examine the Predicted and Actual Values
You can plot the predicted values against the actual values to see how accurately the model has predicted. In a perfect model, the resulting scatter plot should form a perfect diagonal line with each predicted value being identical to the actual value - in practice, some variance is to be expected.
Run the cells below to create a temporary table from the **predicted** DataFrame and then retrieve the predicted and actual label values using SQL. You can then display the results as a scatter plot, specifying **-** as the function to show the unaggregated values.

In [0]:
predicted.createOrReplaceTempView("regressionPredictions")

In [0]:
# Microsoft Azure for data visualization
'''
%%sql
SELECT trueLabel, prediction FROM regressionPredictions
'''

In [0]:
# Reference: http://standarderror.github.io/notes/Plotting-with-PySpark/
dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")
## Need it for Databricks
display(dataPred)

### The following is for IBM Watson not Databricks

In [0]:
# IBM Data Science with matplotlib for data visualization

## Need the following for IBM Watson Studio
#%matplotlib inline
#import pandas as pd
#import matplotlib
#import matplotlib.pyplot as plt
import numpy as np

if IS_DB: 
  ## Need the following for IBM Watson Studio
  #from pandas.tools.plotting import scatter_matrix

  # Reference: http://standarderror.github.io/notes/Plotting-with-PySpark/
  dataPred = spark.sql("SELECT trueLabel, prediction FROM regressionPredictions")
  # convert to pandas and plot
  ## Need the following for IBM Watson Studio
  # regressionPredictionsPanda = dataPred.toPandas()
  # stuff = scatter_matrix(regressionPredictionsPanda, alpha=0.7, figsize=(6, 6), diagonal='kde')

### Retrieve the Root Mean Square Error (RMSE)
There are a number of metrics used to measure the variance between predicted and actual values. Of these, the root mean square error (RMSE) is a commonly used value that is measured in the same units as the prediced and actual values - so in this case, the RMSE indicates the average number of minutes between predicted and actual flight delay values. You can use the **RegressionEvaluator** class to retrieve the RMSE (about 13.24 with 10 folds).

In [0]:
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

### References
1. Class Imbalance in Credit Card Fraud Detection - Part 3 : Undersampling in Spark, http://blog.madhukaraphatak.com/class-imbalance-part-3/
1. Winning a Kaggle competition with Apache Spark and SparkML Machine Learning Pipelines, https://developer.ibm.com/tv/dwlive010-replay-code-machine-learning-flow-spark-ml/
1. Amazon S3 with Apache Spark, https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html
1. How to create and query a table or DataFrame on AWS S3, https://docs.databricks.com/_static/notebooks/data-import/s3.html
1. https://github.com/romeokienzler/uhack/tree/master/projects/bosch
1. Access DBFS with dbutils, https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#access-dbfs-with-dbutils