## CIS5560: IOWA - Linear Regression + Cross Validation + Parameter Tuning using TrainValidationSplit

### Project 5560

##Evaluating a Regression Model

###Create a TestCluster

Create a cluster with Databricks runtime version 6.5 (Scala 2.11, Spark 2.4.5). Attach it to the notebook.

##Import the Libraries

First, we import the libraries we will need to create the dataframe and make a sample out of it.

In [4]:
# Import Spark SQL and Spark ML libraries

from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

import sys


### TODO 0: Run the code in PySpark CLI
1. Set the following to True:
```
PYSPARK_CLI = True
```
1. You need to generate py (Python) file: File > Export > Source File
1. Run it at your Hadoop/Spark cluster:
```
$ spark-submit Python_Creating_Project_Sample_Data.py
```

In [6]:
PYSPARK_CLI = False
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [7]:
# DataFrame Schema
liquorsalesSchema = StructType([
  StructField("Invoice/Item Number", StringType(), False),
  StructField("Date", StringType(), False),
  StructField("StoreNumber", IntegerType(), False),
  StructField("StoreName", StringType(), False),
  StructField("Address", StringType(), False),
  StructField("City", StringType(), False),
  StructField("ZipCode", IntegerType(), False),
  StructField("StoreLocation", StringType(), False),
  StructField("CountyNumber", IntegerType(), False),
  StructField("County", StringType(), False),
  StructField("Category", IntegerType(), False),
  StructField("CategoryName", StringType(), False),
  StructField("VendorNumber", IntegerType(), False),
  StructField("VendorName", StringType(), False),
  StructField("ItemNumber", IntegerType(), False),
  StructField("ItemDescription", StringType(), False),
  StructField("Pack", IntegerType(), False),
  StructField("BottleVolumeInMl)", IntegerType(), False),
  StructField("StateBottleCost", DoubleType(), False),
  StructField("StateBottleRetail", DoubleType(), False),
  StructField("BottlesSold", IntegerType(), False),
  StructField("SaleInDollars", DoubleType(), False),
  StructField("VolumeSoldInLitres",DoubleType(), False),
  StructField("VolumeSoldInGallons", DoubleType(), False),
])

## Load the Data to the table

### TODO 1: follow the direction to read your table after upload it to Data at the left frame
1. After _iowaliquorsalessample.csv_ file is added to the data of the left frame, create a table using the UI, especially, "Upload File"
1. Click "Preview Table to view the table" and Select the option as iowaliquorsalessample.csv has a header as the first row: "First line is header"
1. Change the data type of the table columns as shown in liquorsalesSchema of the above cell
1. When you click on create table button, remember the table name, for example, _iowaliquorsalessample_csv_

In [9]:
if PYSPARK_CLI:
    csv = spark.read.csv('iowa_Liquor_Sales.csv', inferSchema=True, header=True)
else:
   csv = spark.sql("SELECT * FROM iowaliquorsalessample_csv")

# Load the source data
# csv = spark.read.csv('wasb:///data/iowa_Liquor_Sales.csv', inferSchema=True, header=True)

csv.show(truncate = False)

%md ##Select features and label

 ####Select the relevant columns in a new dataframe. Then Define the features and label.

In [11]:
# Select relevant columns.
csv1 = csv.select("Pack", "BottleVolumeInMl", "StateBottleCost", "StateBottleRetail", "BottlesSold", "SaleInDollars", "VolumeSoldInLitres")

df1 = csv1.filter(csv1.StateBottleCost.isNotNull())
df2 = df1.filter(df1.StateBottleRetail.isNotNull())
df3 = df2.filter(df2.BottleVolumeInMl.isNotNull())
df4 = df3.filter(df3.Pack.isNotNull())
df5 = df4.filter(df4.BottlesSold.isNotNull())
df6 = df5.filter(df5.SaleInDollars.isNotNull())
df7 = df6.filter(df6.VolumeSoldInLitres.isNotNull())

In [12]:
# Select features and label
data = df7.select(col("Pack").cast(DoubleType()), col("BottleVolumeInMl").cast(DoubleType()), "StateBottleCost", "StateBottleRetail", col("BottlesSold").cast(DoubleType()), "VolumeSoldInLitres", col("SaleInDollars").alias("label"))

data.show(5)

##Split the data
####Split the data in 70-30 train-test ratio.

In [14]:
# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

### Define the Pipeline and Train the Model
Now define a pipeline that creates a feature vector and trains a regression model

In [16]:
# Define the pipeline
assembler = VectorAssembler(inputCols = ["Pack", "BottleVolumeInMl","StateBottleCost", "StateBottleRetail", "BottlesSold", "VolumeSoldInLitres"], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
pipeline = Pipeline(stages=[assembler, lr])


### Tune Parameters
You can tune parameters to find the best model for your data. A simple way to do this is to use  **TrainValidationSplit** to evaluate each combination of parameters defined in a **ParameterGrid** against a subset of the training data in order to find the best performing parameters. We are also using **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple *folds* of the data split into training and validation datasets, in order to find the best performing parameters. Note that this can take a long time to run because every parameter combination is tried multiple times.

#### Regularization 
is a way of avoiding Imbalances in the way that the data is trained against the training data so that the model ends up being over fit to the training data. In other words It works really well with the training data but it doesn't generalize well with other data.
That we can use a **regularization parameter** to vary the way that the model balances that way.

#### Training ratio of 0.8
it's going to use 80% of the the data that it's got in its training set to train the model and then the remaining 20% is going to use to validate the trained model. 

In **ParamGridBuilder**, all possible combinations are generated from regParam, maxIter. So it is going to try each combination of the parameters with 80% of the the data to train the model and 20% to to validate it.

##Cross Validation

In [19]:
paramGrid_cv = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [10, 5]).build()

cv = CrossValidator(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid_cv, numFolds=10)


piplineModel_cv = cv.fit(train)

### Test the Model with Cross Validation
Now we are ready to apply the model to the test data.

In [21]:
prediction = piplineModel_cv.transform(test)
predicted_cv = prediction.select("features", "prediction", "trueLabel")
predicted_cv.show(10)

### Examine the Predicted and Actual Values

In [23]:
predicted_cv.createOrReplaceTempView("regressionPredictionsCV")

### Data visualization using SQL in Databricks.

In [25]:
%sql
SELECT trueLabel, prediction FROM regressionPredictionsCV


trueLabel,prediction
45.0,249.59001723231333
45.0,249.59001723231333
45.0,249.59001723231333
90.0,253.7169884849059
90.0,253.7169884849059
135.0,257.8439597374985
135.0,257.8439597374985
180.0,261.9709309900911
270.0,270.2248734952764
675.0,307.36761476860977


### Retrieve the Root Mean Square Error (RMSE)
There are a number of metrics used to measure the variance between predicted and actual values. Of these, the root mean square error (RMSE) is a commonly used value that is measured in the same units as the predicted and actual values - so in this case, the RMSE indicates the average amount between predicted and actual sales values. We can use the **RegressionEvaluator** class to retrieve the RMSE.

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_cv = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator_cv.evaluate(prediction)
print ("Root Mean Square Error (RMSE_CV:", rmse)

##TrainValidationSplit

In [29]:
paramGrid_tvs = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [10, 5]).build()

tvs = TrainValidationSplit(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid_tvs, trainRatio=0.8)

piplineModel_tvs = tvs.fit(train)

### Test the Model with TrainValidationSplit
Now you're ready to apply the model to the test data.

In [31]:
prediction = piplineModel_tvs.transform(test)
predicted_tvs = prediction.select("features", "prediction", "trueLabel")
predicted_tvs.show(10)

### Examine the Predicted and Actual Values

In [33]:
predicted_tvs.createOrReplaceTempView("regressionPredictionsTVS")

### Data visualization using SQL in Databricks.

In [35]:
%sql
SELECT trueLabel, prediction FROM regressionPredictionsTVS


trueLabel,prediction
45.0,249.59001723231333
45.0,249.59001723231333
45.0,249.59001723231333
90.0,253.7169884849059
90.0,253.7169884849059
135.0,257.8439597374985
135.0,257.8439597374985
180.0,261.9709309900911
270.0,270.2248734952764
675.0,307.36761476860977


### Retrieve the Root Mean Square Error (RMSE)
There are a number of metrics used to measure the variance between predicted and actual values. Of these, the root mean square error (RMSE) is a commonly used value that is measured in the same units as the predicted and actual values - so in this case, the RMSE indicates the average amount between predicted and actual sales values. We can use the **RegressionEvaluator** class to retrieve the RMSE.

In [37]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_tvs = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator_tvs.evaluate(prediction)
print ("Root Mean Square Error (RMSE_tvs):", rmse)