#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
#Implied Volatility Machine Learning Pipeline Application
This notebook is an end-to-end exercise of performing Extract-Transform-Load and Exploratory Data Analysis on a real-world dataset, and then applying several different machine learning algorithms to solve a supervised regression problem on the dataset.

** This notebook covers: **
* *Part 1: Load Your Data*
* *Part 2: Explore Your Data*
* *Part 3: Visualize Your Data*
* *Part 4: Data Preparation*
* *Part 5: Data Modeling*
* *Part 6: Tuning and Evaluation*

## Part 1: Extract-Transform-Load (ETL) Data

Our data is available on Amazon s3 at the following path:

```
/FileStore/tables/gtd0kb9j1481295433792/cisc5352_project_1_option_data-f2b26.csv```

**To Do:** Let's start by printing a sample of the data.

We'll use the built-in Databricks functions for exploring the Databricks filesystem (DBFS)

In [3]:
display(dbutils.fs.ls("/FileStore/tables/gtd0kb9j1481295433792/cisc5352_project_1_option_data-f2b26.csv"))

Next, use the `dbutils.fs.head` command to look at the first 65,536 bytes of the first file in the directory.

`dbutils.fs` has its own help facility, which we can use to see the various available functions.

In [6]:
dbutils.fs.help()

Now, let's use PySpark instead to print the first 5 lines of the data.

In [8]:
# TODO: Load the data and print the first five lines.
rawRdd = sc.textFile("/FileStore/tables/gtd0kb9j1481295433792/cisc5352_project_1_option_data-f2b26.csv") 
rawRdd.take(5)

From our initial exploration of a sample of the data, we can make several observations for the ETL process:
  - The data is a set of .tsv (Tab Seperated Values) files (i.e., each row of the data is separated using tabs)
  - There is a header row, which is the name of the columns
  - It looks like the type of the data in each column is consistent (i.e., each column is of type double or int)

Our schema appears below:
- Stock Price
- Strike Price
- Time to Maturity
- Interest Rate
- Option Price
- Volatility
- Implied Volatility

(**Note**: In Spark 2.0, the CSV package is built into the DataFrame API.)

In [10]:
stockDF=sqlContext.read.format('com.databricks.spark.csv').options(delimiter=',',header='true',inferschema='true').load("/FileStore/tables/gtd0kb9j1481295433792/cisc5352_project_1_option_data-f2b26.csv")

In [11]:
print stockDF.dtypes

We can examine the data using the display() method.

In [13]:
display(stockDF)

## Part 2: Explore Data

First, let's register our DataFrame as an SQL table named `stock_table`.  Because you may run this lab multiple times, we'll take the precaution of removing any existing tables first.

We can delete any existing `stock_table` SQL table using the SQL command: `DROP TABLE IF EXISTS stock_table` (we also need to to delete any Hive data associated with the table, which we can do with a Databricks file system operation).

Once any prior table is removed, we can register our DataFrame as a SQL table using [sqlContext.registerDataFrameAsTable()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerDataFrameAsTable).

In [16]:
sqlContext.sql("DROP TABLE IF EXISTS stock_table")
dbutils.fs.rm("dbfs:/user/hive/warehouse/tock_table", True)
sqlContext.registerDataFrameAsTable(stockDF, "stock_table")

Now that our DataFrame exists as a SQL table, we can explore it using SQL commands.

To execute SQL in a cell, we use the `%sql` operator. The following cell is an example of using SQL to query the rows of the SQL table.

In [18]:
%sql
-- We can use %sql to query the rows
SELECT * FROM stock_table

Use the SQL `desc` command to describe the schema, by executing the following cell.

In [20]:
%sql
desc stock_table

Let's perform some basic statistical analyses of all the columns.

In [22]:
df = sqlContext.table("stock_table")
display(df.describe())

In [23]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *

df = df.select("Stock_Price", "Strike_Price", "Option_Price",df['Option_Type'].cast(DoubleType()),'Volatility',col('Implied Volatility').alias('IV'),df["Time_to_Maturity"].cast(DoubleType()))

In [24]:
df.dtypes

##Part 4: Data Preparation

The next step is to prepare the data for machine learning. Since all of this data is numeric and consistent this is a simple and straightforward task.

The goal is to use machine learning to determine a function that yields the output power as a function of a set of predictor features. The first step in building our ML pipeline is to convert the predictor features from DataFrame columns to Feature Vectors using the [pyspark.ml.feature.VectorAssembler()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler) method.

The VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler takes a list of input column names (each is a string) and the name of the output column (as a string).

In [26]:
# TODO: Replace <FILL_IN> with the appropriate code
from pyspark.ml.feature import VectorAssembler

datasetDF = df

vectorizer = VectorAssembler()
vectorizer.setInputCols(["Stock_Price", "Strike_Price", "Time_to_Maturity","Option_Price",'Option_Type','Volatility']) 
vectorizer.setOutputCol("features")

##Part 5: Data Modeling

Our first model will be based on simple linear regression since we saw some linear patterns in our data based on the scatter plots during the exploration stage.

We need a way of evaluating how well our linear regression model predicts power output as a function of input parameters. We can do this by splitting up our initial data set into a _Training Set_ used to train our model and a _Test Set_ used to evaluate the model's performance in giving predictions. We can use a DataFrame's [randomSplit()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) method to split our dataset. The method takes a list of weights and an optional random seed. The seed is used to initialize the random number generator used by the splitting function.

In [28]:
# TODO: Replace <FILL_IN> with the appropriate code.
# We'll hold out 20% of our data for testing and leave 80% for training
seed = 1800009193L
(split20DF, split80DF) = datasetDF.randomSplit([0.2,0.8],seed)

# Let's cache these datasets for performance
testSetDF = split20DF.cache()
trainingSetDF = split80DF.cache()

Next we'll create a Linear Regression Model and use the built in help to identify how to train it. See API details for [Linear Regression](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression) in the ML guide.

In [30]:
# ***** LINEAR REGRESSION MODEL ****

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml import Pipeline

# Let's initialize our linear regression learner
lr = LinearRegression()

# We use explain params to dump the parameters we can use
print(lr.explainParams())

The cell below is based on the [Spark ML Pipeline API for Linear Regression](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression).

In [32]:
# Now we set the parameters for the method
lr.setLabelCol('IV')\
  .setMaxIter(100)\
  .setRegParam(0.1)


# We will use the new spark.ml pipeline API.
lrPipeline = Pipeline()

lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
lrModel = lrPipeline.fit(trainingSetDF)

In [33]:
# The intercept is as follows:
intercept = lrModel.stages[1].intercept

# The coefficents (i.e., weights) are as follows:
weights = lrModel.stages[1].coefficients

# Create a list of the column names (without PE)
featuresNoLabel = [col for col in datasetDF.columns if col != "IV"]

# Merge the weights and labels
coefficents = zip(weights, featuresNoLabel)

# Now let's sort the coefficients from greatest absolute weight most to the least absolute weight
coefficents.sort(key=lambda tup: tup[0], reverse=True)

equation = "y = {intercept}".format(intercept=intercept)
variables = []
for x in coefficents:
    weight = x[0]
    name = x[1]
    symbol = "+" if (x[0] > 0) else "-"
    equation += (" {} ({} * {})".format(symbol, weight, name))

# Finally here is our equation
print("Linear Regression Equation: " + equation)

In [34]:
# Apply our LR model to the test data and predict power output
predictionsAndLabelsDF = lrModel.transform(testSetDF).select("Stock_Price", "Strike_Price", "Time_to_Maturity","Option_Price",'Option_Type','Volatility','IV',"prediction")

display(predictionsAndLabelsDF)

In [35]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="prediction", labelCol="IV", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictionsAndLabelsDF)

print("Root Mean Squared Error: %.2f" % rmse)

Another useful statistical evaluation metric is the coefficient of determination, denoted \\(R^2\\) or \\(r^2\\) and pronounced "R squared". It is a number that indicates the proportion of the variance in the dependent variable that is predictable from the independent variable and it provides a measure of how well observed outcomes are replicated by the model, based on the proportion of total variation of outcomes explained by the model. The coefficient of determination ranges from 0 to 1 (closer to 1), and the higher the value, the better our model.

In [37]:
# Now let's compute another evaluation metric for our test dataset
r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("r2: {0:.2f}".format(r2))

Generally, assuming a Gaussian distribution of errors, a good model will have 68% of predictions within 1 RMSE and 95% within 2 RMSE of the actual value (see http://statweb.stanford.edu/~susan/courses/s60/split/node60.html).

Let's examine the predictions and see if a RMSE of 0.15 meets this criteria.

In [39]:
# First we remove the table if it already exists
sqlContext.sql("DROP TABLE IF EXISTS stock_table_RMSE_Evaluation")
dbutils.fs.rm("dbfs:/user/hive/warehouse/stock_table_RMSE_Evaluation", True)

# Next we calculate the residual error and divide it by the RMSE
predictionsAndLabelsDF.selectExpr("IV", "prediction", "IV - prediction Residual_Error","(IV - prediction) /{} Within_RSME".format(rmse)).registerTempTable("stock_table_RMSE_Evaluation")

We can use SQL to explore the `Power_Plant_RMSE_Evaluation` table. First let's look at at the table using a SQL SELECT statement.

In [41]:
%sql
SELECT * from stock_table_RMSE_Evaluation

Now we can display the RMSE as a Histogram.

Notice that the histogram clearly shows that the RMSE is centered around 0.

In [43]:
%sql
-- Now we can display the RMSE as a Histogram
SELECT Within_RSME  from stock_table_RMSE_Evaluation

Using a complex SQL SELECT statement, we can count the number of predictions within + or - 1.0 and + or - 2.0 and then display the results as a pie chart.

In [45]:
%sql
SELECT case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1
            when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3
       end RSME_Multiple, COUNT(*) AS count
FROM stock_table_RMSE_Evaluation
GROUP BY case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1  when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end

From the pie chart, we can see that 68% of our test data predictions are within 1 RMSE of the actual values, and 97% (68% + 29%) of our test data predictions are within 2 RMSE. So the model is pretty decent. Let's see if we can tune the model to improve it further.

##Part 6: Tuning and Evaluation

We perform the following steps:
  - Create a [CrossValidator](https://spark.apache.org/docs/1.6.2/ml-tuning.html#cross-validation) using the Pipeline and [RegressionEvaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator) that we created earlier, and set the number of folds to 3
  - Create a list of 10 regularization parameters
  - Use [ParamGridBuilder](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) to build a parameter grid with the regularization parameters and add the grid to the [CrossValidator](https://spark.apache.org/docs/1.6.2/ml-tuning.html#cross-validation)
  - Run the [CrossValidator](https://spark.apache.org/docs/1.6.2/ml-tuning.html#cross-validation) to find the parameters that yield the best model (i.e., lowest RMSE) and return the best model.

In [48]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.01 to 0.10
regParam = [x / 100.0 for x in range(1, 11)]

# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
cvModel = crossval.fit(trainingSetDF).bestModel

Now that we have tuned our Linear Regression model, let's see what the new RMSE and \\(r^2\\) values are versus our intial model.

### Exercise 7(b)

Complete and run the next cell.

In [50]:
# TODO: Replace <FILL_IN> with the appropriate code.
# Now let's use cvModel to compute an evaluation metric for our test dataset: testSetDF
predictionsAndLabelsDF = cvModel.transform(testSetDF).select("Stock_Price", "Strike_Price", "Time_to_Maturity","Option_Price",'Option_Type','Volatility','IV',"prediction")

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseNew = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2New = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("Original Root Mean Squared Error: {0:2.2f}".format(rmse))
print("New Root Mean Squared Error: {0:2.2f}".format(rmseNew))
print("Old r2: {0:2.2f}".format(r2))
print("New r2: {0:2.2f}".format(r2New))

[Decision Tree Learning](https://en.wikipedia.org/wiki/Decision_tree_learning) uses a [Decision Tree](https://en.wikipedia.org/wiki/Decision_tree) as a predictive model which maps observations about an item to conclusions about the item's target value. It is one of the predictive modelling approaches used in statistics, data mining and machine learning. Decision trees where the target variable can take continuous values (typically real numbers) are called regression trees.

Spark ML Pipeline provides [DecisionTreeRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor) as an implementation of [Decision Tree Learning](https://en.wikipedia.org/wiki/Decision_tree_learning).

The cell below is based on the [Spark ML Pipeline API for Decision Tree Regressor](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor).

In [52]:
# TODO: Replace <FILL_IN> with the appropriate code.
from pyspark.ml.regression import DecisionTreeRegressor

# Create a DecisionTreeRegressor
dt = DecisionTreeRegressor()

dt.setLabelCol("IV")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")\
  .setMaxBins(100)

# Create a Pipeline
dtPipeline = Pipeline()


# Set the stages of the Pipeline
dtPipeline.setStages([vectorizer, dt])

Instead guessing what parameters to use, we will use [Model Selection](https://spark.apache.org/docs/1.6.2/ml-tuning.html#model-selection-aka-hyperparameter-tuning) or [Hyperparameter Tuning](https://spark.apache.org/docs/1.6.2/ml-tuning.html#model-selection-aka-hyperparameter-tuning) to create the best model.

We can reuse the exiting [CrossValidator](https://spark.apache.org/docs/1.6.2/ml-tuning.html#cross-validation) by replacing the Estimator with our new `dtPipeline` (the number of folds remains 3).

In [54]:
# TODO: Replace <FILL_IN> with the appropriate code.
# Let's just reuse our CrossValidator with the new dtPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(dtPipeline)

# Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [2, 3]).build()
# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
dtModel = crossval.fit(trainingSetDF).bestModel

Now let's see how our tuned DecisionTreeRegressor model's RMSE and \\(r^2\\) values compare to our tuned LinearRegression model.

In [56]:
# TODO: Replace <FILL_IN> with the appropriate code.

# Now let's use dtModel to compute an evaluation metric for our test dataset: testSetDF
predictionsAndLabelsDF = dtModel.transform(testSetDF).select("Stock_Price", "Strike_Price", "Time_to_Maturity","Option_Price",'Option_Type','Volatility','IV', "prediction")

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseDT =  regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2DT = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("LR r2: {0:.2f}".format(r2New))
print("DT r2: {0:.2f}".format(r2DT))

The line below will pull the Decision Tree model from the Pipeline as display it as an if-then-else string. Again, we have to "reach through" to the JVM API to make this one work.

**ToDo**: Run the next cell

In [58]:
print dtModel.stages[-1]._java_obj.toDebugString()

[Random forests](https://en.wikipedia.org/wiki/Random_forest) or random decision tree forests are an ensemble learning method for regression that operate by constructing a multitude of decision trees at training time and outputting the class that is the mean prediction (regression) of the individual trees. Random decision forests correct for decision trees' habit of overfitting to their training set.

Spark ML Pipeline provides [RandomForestRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor) as an implementation of [Random forests](https://en.wikipedia.org/wiki/Random_forest).

In [60]:
# TODO: Replace <FILL_IN> with the appropriate code.

from pyspark.ml.regression import RandomForestRegressor

# Create a RandomForestRegressor
rf = RandomForestRegressor()

rf.setLabelCol("IV")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")\
  .setSeed(100088121L)\
  .setMaxDepth(8)\
  .setNumTrees(30)

# Create a Pipeline
rfPipeline = Pipeline()

# Set the stages of the Pipeline
rfPipeline.setStages([vectorizer, rf])

As with Decision Trees, instead guessing what parameters to use, we will use [Model Selection](https://spark.apache.org/docs/1.6.2/ml-tuning.html#model-selection-aka-hyperparameter-tuning) or [Hyperparameter Tuning](https://spark.apache.org/docs/1.6.2/ml-tuning.html#model-selection-aka-hyperparameter-tuning) to create the best model.

We can reuse the exiting [CrossValidator](https://spark.apache.org/docs/1.6.2/ml-tuning.html#cross-validation) by replacing the Estimator with our new `rfPipeline` (the number of folds remains 3).

In [62]:
# TODO: Replace <FILL_IN> with the appropriate code.
# Let's just reuse our CrossValidator with the new rfPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(rfPipeline)

# Let's tune over our rf.maxBins parameter on the values 50 and 100, create a paramter grid using the ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(rf.maxBins, [50, 100]).build()

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
rfModel = crossval.fit(trainingSetDF).bestModel

Now let's see how our tuned RandomForestRegressor model's RMSE and \\(r^2\\) values compare to our tuned LinearRegression and tuned DecisionTreeRegressor models.

Complete and run the next cell.

In [64]:
# TODO: Replace <FILL_IN> with the appropriate code.

# Now let's use rfModel to compute an evaluation metric for our test dataset: testSetDF
predictionsAndLabelsDF = rfModel.transform(testSetDF).select("Stock_Price", "Strike_Price", "Time_to_Maturity","Option_Price",'Option_Type','Volatility','IV', "prediction")

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseRF = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2RF = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("RF Root Mean Squared Error: {0:.2f}".format(rmseRF))
print("LR r2: {0:.2f}".format(r2New))
print("DT r2: {0:.2f}".format(r2DT))
print("RF r2: {0:.2f}".format(r2RF))

In [65]:
print rfModel.stages[-1]._java_obj.toDebugString()

### Conclusion

Best model is in fact our Random Forest tree model which uses an ensemble of 30 Trees with a depth of 8 to construct a better model than the single decision tree.