# Spark MLlib
## Lab assignment: Example with the MLlib

The aim of this notebook is to play with the MLlib of Apache Spark to create a Machine Learning pipeline that preprocess a dataset, train a model and make predictions. In particular, we are going to deal with a regression problem. The content of this lab has been inspired by the sample provided in the [DataBricks documentation](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2854662143668609/2084788691983918/6837869239396014/latest.html).


## Appliances Energy Prediction

**Data**: This dataset contains Energy consumption from appliances at 10 min resolution for about 4.5 months. The house temperature and humidity conditions were monitored with a wireless sensor network. Each wireless node transmitted the temperature and humidity conditions around 3.3 min. Then, the wireless data was averaged for 10 minutes periods. The energy data was logged every 10 minutes. Weather from the nearest airport weather station (Chievres Airport, Belgium) was also logged. This dataset is from [Candanedo et al](http://dx.doi.org/10.1016/j.enbuild.2017.01.083) and is hosted by the UCI Machine Learning Repository. [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction).


**Goal**: We want to learn to predict appliances' energy consumption based on weather information. It would also be nice to know which input features are the most relevant to make predictions.

**Approach**: We will use Spark ML Pipelines, which help users piece together parts of a workflow such as feature processing and model training. We will also demonstrate model selection (a.k.a. hyperparameter tuning) using Cross Validation in order to fine-tune and improve our ML model.

## Submission and marking criteria

You should complete this notebook and add your solutions to it. When you are done, rename your completed notebook as `ex04.ipynb`. 

Important notes:
- The **group leader** must submit the `ex04.ipynb` file on Moodle.
- **Each member of the group** must complete the peer review survey and their contribution statement using this [link](https://forms.office.com/Pages/ResponsePage.aspx?id=7qe9Z4D970GskTWEGCkKHjZupmfSK6JKqlvGZrucaoBURFJJWllYWVhQS09PMFNBVzlCT05JUjM4VCQlQCN0PWcu). **You can only submit this survey ONCE**.
- This lab is marked out of 100 marks, and each section is allocated a number of marks that are indicated below.
- The marking will be focused on the efficiency of the solution and its functionality. Minor mistakes will deduct marks from each exercise.

- **Submission deadline: 25th March 2022 at 3pm**

In [0]:
%matplotlib inline 
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Row

# Helper function to test the correctness of the solutions
def test(var, val, msg=""):
    print("1 test passed.") if var == val else print("1 test failed. " + msg)

## Load and understand the data  [10 marks]

We begin by loading the data, which is in Comma-Separated Value (CSV) format. For that, you should use `spark.read` to read the file. Then, you should also cache the data so that we only read it from disk once. You will need to upload the data to your group's storage accout.

In [0]:
df = spark.read.option("header","true").csv("/mnt/data/energydata_complete.csv")

df.cache()

In [0]:
test(df.count(), 19735, 'Incorrect number of rows')
test(df.is_cached, True, 'df not cached')

#### Data description

From the UCI ML Repository description, we know that the columns have the following meanings.

**Attribute information**:
```
date time year-month-day hour:minute:second
Appliances, energy use in Wh
lights, energy use of light fixtures in the house in Wh
T1, Temperature in kitchen area, in Celsius
RH_1, Humidity in kitchen area, in %
T2, Temperature in living room area, in Celsius
RH_2, Humidity in living room area, in %
T3, Temperature in laundry room area
RH_3, Humidity in laundry room area, in %
T4, Temperature in office room, in Celsius
RH_4, Humidity in office room, in %
T5, Temperature in bathroom, in Celsius
RH_5, Humidity in bathroom, in %
T6, Temperature outside the building (north side), in Celsius
RH_6, Humidity outside the building (north side), in %
T7, Temperature in ironing room , in Celsius
RH_7, Humidity in ironing room, in %
T8, Temperature in teenager room 2, in Celsius
RH_8, Humidity in teenager room 2, in %
T9, Temperature in parents room, in Celsius
RH_9, Humidity in parents room, in %
To, Temperature outside (from Chievres weather station), in Celsius
Pressure (from Chievres weather station), in mm Hg
RH_out, Humidity outside (from Chievres weather station), in %
Wind speed (from Chievres weather station), in m/s
Visibility (from Chievres weather station), in km
Tdewpoint (from Chievres weather station), Â°C
rv1, Random variable 1, nondimensional
rv2, Random variable 2, nondimensional
```

**The target variable is the energy use of the Appliances.**

For now, we will leave the two variables `rv1` and `rv2` in our dataset, to see if they are affecting much our methods, then we can try to remove them and see if we improve the results.

Use the `display` (databricks only) or `show` commands to visualise the data. 
- Remember if you use `display` you can also easily make graphs
- If you are using `show` be careful not to show the entire data frame :-), only 5 rows!

In [0]:
df.display()

# Data preprocessing [10 marks]

This dataset is nicely prepared for Machine Learning and required very little preprocessing. However, rather than keeping the date as a timestamp, we would like to have some additional columns, including 'day of the year', 'hour', and 'month of the year'. Please use the naming `dayofyear`, `hour` and `month`, respectively.

**Hint**: Of course the SparkSQL library has a function to transform strings with datetime!

Would you like to have any other information from the datetime? Feel free to add other features.

In [0]:
df = df.withColumn('dayofyear', F.lit(F.dayofyear('date'))) \
                    .withColumn('hour', F.lit(F.hour('date'))) \
                    .withColumn('month', F.lit(F.month('date')))

When you dataframe `df` has the additional columns, please remove the column `date`:

In [0]:
df = df.drop("date")

In [0]:
test("date" in df.columns, False, "Column date has not been remove!")
test("hour" in df.columns, True, "The hour hasn't been added")
test("dayofyear" in df.columns, True, "The dayofyear hasn't been added")
test("month" in df.columns, True, "The month hasn't been added")

Check the schema of your dataframe:

In [0]:
df.printSchema()

Dammit, all the input features have been inferred as strings rather than numeric values. This is because we read the data from a CSV file with the data in between quotes.

Your task now is to transform that into numerical values. All of the features are actually numeric, so you could cast all of them. You are recommended to use functions like `cast` and `col` to do this. You could try to leave out the datetime columns we created, but it's fine if you transform them to float.

In [0]:
for c in df.columns[:-3]:
    # add condition for the cols to be type cast
    df = df.withColumn(c, df[c].cast('double'))

In [0]:
df.printSchema()

## Split data into training and test sets [10 marks]

Our final data preparation step is to split our dataset into training and test sets. We will train and tune our model on the training set, and then see how well we do in the test.

Your task is to split the dataframe `df` into 70% for training and 30% for test. Please use the same random seed.

In [0]:
train, test = df.randomSplit([0.7, 0.3])

Even though we have fixed the random seed, you will not get the exact same split.

Note that this is the simplest way of validating your results. You may want to carry out a [k-fold cross validation](https://en.wikipedia.org/wiki/Cross-validation_(statistics)) and split the dataset into *k* folds, and build and test *k* models. We will do later cross validation but for parameter tuning! not to validate our approach!

## Data visualisation [10 marks]

Before applying any machine learning algorithm, it is a good practice to try to visualise your data. For example, we could see how much energy is spent in appliances depending on the month.

In [0]:
df.display()

In [0]:
# create a variable `hist_elect` that contains the histogram of total energy consumed by the Appliances, grouped by month
hist_elect = df.groupBy("month").sum("Appliances").collect()

In [0]:
(x_values, y_values) = zip(*hist_elect)
plt.bar(x_values, y_values)
plt.title('Histogram of repetitions (up to 10)')
plt.xlabel('Month')
plt.ylabel('Sum of Appliance Energy')
plt.show()

As we don't seem to have all the data from the 1st of January, well, we have less consumption in that particular months, so probably the month is not a good feature, don't you think? we are going to remove it from the dataframe and the training and test data frames too!

In [0]:
# update the variables `df`, `train` and `test`, removing the column 'month'
df = df.filter((df.month != '1'))
train, test = df.randomSplit([0.7, 0.3])

You could do other plots to understand better the data and practice with Spark :-). This is a good opportunity to practice with the DataFrame API, although if you are running this on Databricks, some plots can be generated very easily by yourself.

In [0]:
hist_elect2 = df.groupBy("month").sum("Appliances").collect()
(x_values, y_values) = zip(*hist_elect2)
plt.bar(x_values, y_values)
plt.title('Histogram of repetitions (up to 10)')
plt.xlabel('Month')
plt.ylabel('Sum of Appliance Energy')
plt.show()

## Create a Pipeline with Spark ML [20 marks]

As you know, we can't feed the data frame directly to a machine learning algorithm, as we need to put all the input features as an Array, and indicate which one is the output feature (in our case, the 'Appliances' column!).

We will put together a simple Pipeline with the following stages:

- VectorAssembler: To combine all the input columns into a single vector column (i.e. all the columns but the 'Appliances' one.
- Learning algorithm: I feel like using Gradient-Boosted Trees [GBTs](https://en.wikipedia.org/wiki/Gradient_boosting) for this example, but feel free to use anything else.
- CrossValidator: I will use cross validation to tune the parameters of the GBT model. Yes, this can be added as part of a pipeline!  This is going to change the way we access the best model later!

Step 1: Create the `VectorAssembler`:

In [0]:
from pyspark.ml.feature import VectorAssembler

feature_cols = train.columns
feature_cols.remove("Appliances")
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembler.transform(train).show(2)

Step 2: Create an instance of `GBTRegressor` in which you don't indicate any parameters but the class label (i.e. `labelCol`) to 'Appliances'.

In [0]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(labelCol="Appliances")

Step 3: Create a `CrossValidator` for `gbt`.

You can explore the parameters you like for GBT. Full documentation [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.GBTRegressor.html). I would suggest to create a 'grid' for at least the depth of the tree and the number of iterations. Don't investigate more than 4-8 combinations! :-)

Import the right libraries:

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
# define a variable `paramGrid` with some parameters, e.g. for maxDepth, range [5,8], and for maxIter [10,20]

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2]) \
    .addGrid(gbt.maxIter, [5,8,10,20]) \
    .build()

Create a RegressionEvaluator that uses the [Root Mean Squared Error (RMSE)](https://en.wikipedia.org/wiki/Root-mean-square_deviation) as our performance metric. Import the right package first:

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# create a variable `evaluator` with a rmse as metric for the prediction column given by `gbt`.
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Appliances", predictionCol="prediction")

We can now create a CrossValidator `cv` that uses the `gbt` as estimator, as well as the evaluator and grid we defined above.

In [0]:
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid,
                          evaluator = RegressionEvaluator(metricName="rmse", labelCol="Appliances", predictionCol="prediction"),
                          numFolds=3)

Step 4. Create a Pipeline (`pipeline`) that contains the two stages: `vectorAssembler` and `cv`

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, cv])

Step 5. Finally, `fit` the model, and store in a `pipelineModel` variable. This might take quite a bit of time!

In [0]:
pipelineModel = pipeline.fit(train)

It could be a good idea to save this to disk, in case it takes too long, so we can read it later. Remember to save it to your group's storage account.

In [0]:
#pipelineModel.save("/mnt/pirates-of-the-carribbytes/Model-lab4_sahil_5")

## Evaluate the results [10 marks]

To obtain the predictions in the test set, apply the method `transform()` of the trained pipeline on the test DataFrame! This will not apply the cross-validation of course!

In [0]:
# create a variable `predictions`:
prediction = pipelineModel.transform(test)

It is easier to view the results when we limit the columns displayed to:

- `Appliances`: the consumption of the Appliances in Wh
- `prediction`: our predicted prediction

Find a way to show the output only for these two features (only for the first 5 rows):

In [0]:
prediction.select("Appliances","prediction").show(3)

Are these results any good? Let's compute the RMSE using the evaluator we created before! Store the result in a `rmse` variable.

In [0]:
rmse = evaluator.evaluate(prediction)

In [0]:
print(rmse)

Seems a bit high?  Well, this number is closer to what it is reported in the original paper with RF (RMSE around 69).

But maybe you can investigate a bit more if you can improve that. Can you find out the importance of the features from the GBTs?

You first need to find out the best model!! In the way we trained the pipeline, you can find the trained cvModel as one of the stages of the `pipelineModel`:

In [0]:
cvModel = pipelineModel.stages[1]

In [0]:
cvModel.bestModel

Check the feature importances of that `cvModel`:

In [0]:
cvModel.bestModel.featureImportances

Uhm, looks like our model gave feature #28 quite a bit of importance, which is the hour of the day if I recall correctly.  Features #25 and #26 were random, and the model noticed that 26 was completely useless, but gave some importance to 'rv1'. GBTs perform somehow an implicit feature selection, so those low importance features won't affect much their performance, but I wonder if we could just remove low importance features?

*Task*: Create a list of those features with less than for example 0.05.

In [0]:
pairs = cvModel.bestModel.featureImportances

In [0]:
# create a list `to_remove` that contains the feature names that must be removed because their confidence is less than 0.05 
to_remove = []

for i in range(len(pairs)):
  if pairs[i]<0.05:
    to_remove.append(i)


In [0]:
print(to_remove)

## Removing low importance features [10 marks]

Check the current schema of the training data:

In [0]:
train.printSchema()

You don't need to remove the columns from train and test partitions as you won't be able to re-train the pipeline without modifying the VectorAssemble. You simply need to indicate the columns you want to use when creating a new `vectorAssembler2`:

In [0]:
vectorAssembler2 = VectorAssembler(inputCols=["T2", "T3", "RH_3", "T9", "dayofyear", "hour"], outputCol="features")
vectorAssembler2.transform(train).show(2)

and create a new pipeline, `pipeline2`, with that new `vectorAssemble2`:

In [0]:
pipeline2 = Pipeline(stages=[vectorAssembler2, cv])

d
And `fit` the new pipeline:

In [0]:
pipelineModel2 = pipeline2.fit(train)

Finally, make predictions and compute the error:

In [0]:
prediction2 = pipelineModel2.transform(test)
rmse2 = evaluator.evaluate(prediction2)

In [0]:
print(rmse2)

Check the features of the best model and feature importances:

In [0]:
cvModel2 = pipelineModel2.stages[1]

In [0]:
cvModel2.bestModel

In [0]:
cvModel2.bestModel.featureImportances

In my case, I got the same performance as before, but this time my model only considered 3 input features! This might not have made our model more precise (Because GBTs already ignored those features), but makes it more interpretable!

# Improving further your model [20 marks]

There might be many ways to improve the results we obtained here. 

A few ideas for you to think about:

- Parameter tuning: I have used a relatively small set of parameters, and I haven't investigated what happened in training and test, is there overfitting of the training? would we be able to use a large number of trees?
- The features of this dataset are numerical, are there other classifiers that may be more appropriate than GBTs?
- We haven't really done any careful pre-processing of the data. Are there outliers or noise that might be having an impact in the results? 
- Do we need any normalisation?

## Check for Overfitting

In [0]:
prediction_train = pipelineModel2.transform(train)
rmse_train = evaluator.evaluate(prediction_train)

In [0]:
print(rmse2)
print(rmse_train) # so not overfitting

## Normalise and Remove Outliers

In [0]:
# find outliers - box plot
boxplot = df.toPandas().boxplot(column=["T2", "T3", "RH_3", "T9", "dayofyear", "hour"])

In [0]:
# remove outliers functions
def remove_outliers(df_in, cols):
  for col in cols:
    quantiles = df_in.approxQuantile(col, [0.25, 0.75], 0)
    if ((quantiles) and (quantiles[1]!=quantiles[0])):
      iqr = quantiles[1]-quantiles[0] #Interquartile range
      fence_low  = quantiles[0]-1.5*iqr
      fence_high = quantiles[1]+1.5*iqr
      df_in = df_in.filter((df_in[col]>fence_low) & (df_in[col]<fence_high))
  return df_in

def remove_outliers_iteratively(df, cols):
  while (True):
    len1 = df.count()
    df = remove_outliers(df, cols)
    len2 = df.count()
    if (len1==len2):
      break
  return df

In [0]:
# Normalizer function
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def normalizeDF(df, cols):
  # UDF for converting column type from vector to double type
  unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

  # Iterating over columns to be scaled
  for i in cols: #[ "T2", "T3", "RH_3", "T9", "dayofyear", "hour"]:
      # VectorAssembler Transformation - Converting column to vector type
      assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

      # MinMaxScaler Transformation
      scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

      # Pipeline of VectorAssembler and MinMaxScaler
      pipeline = Pipeline(stages=[assembler, scaler])

      # Fitting pipeline on dataframe
      df = pipeline.fit(df).transform(df)\
                                .withColumn(i+"_Scaled",unlist(i+"_Scaled")).drop(i+"_Vect")

  return df

In [0]:
# normalise then remove outliers in train
old_df = df
df = normalizeDF(df, ['lights', 'T1', 'RH_1', 'T2', 'RH_2', 'T3', 'RH_3', 'T4',
                      'RH_4', 'T5', 'RH_5', 'T6', 'RH_6', 'T7', 'RH_7', 'T8', 'RH_8', 'T9',
                      'RH_9', 'T_out', 'Press_mm_hg', 'RH_out', 'Windspeed', 'Visibility',
                      'Tdewpoint', 'rv1', 'rv2', 'dayofyear', 'hour', 'month'])

df.display()

In [0]:
train, test = df.randomSplit([0.7, 0.3])
train.display()

In [0]:
import pandas as pd
for col in ["T2_Scaled", "T3_Scaled", "RH_3_Scaled", "T9_Scaled", "dayofyear_Scaled", "hour_Scaled"]:
  gre_histogram = train.select(col).rdd.flatMap(lambda x: x).histogram(11)

  # Loading the Computed Histogram into a Pandas Dataframe for plotting
  pd.DataFrame(
      list(zip(*gre_histogram)), 
      columns=['bin', 'frequency']
  ).set_index(
      'bin'
  ).plot(kind='bar')

In [0]:
#remove outliers in train BETTER IF NOT DONE
'''train = train.select('Appliances','lights_Scaled', 'T1_Scaled', 'RH_1_Scaled', 'T2_Scaled', 'RH_2_Scaled', 'T3_Scaled', 'RH_3_Scaled', 'T4_Scaled',
                      'RH_4_Scaled', 'T5_Scaled', 'RH_5_Scaled', 'T6_Scaled', 'RH_6_Scaled', 'T7_Scaled', 'RH_7_Scaled', 'T8_Scaled', 'RH_8_Scaled',                       'T9_Scaled',
                      'RH_9_Scaled', 'T_out_Scaled', 'Press_mm_hg_Scaled', 'RH_out_Scaled', 'Windspeed_Scaled', 'Visibility_Scaled',
                      'Tdewpoint_Scaled', 'rv1_Scaled', 'rv2_Scaled', 'dayofyear_Scaled', 'hour_Scaled', 'month_Scaled')'''

train = train.select("Appliances","T2_Scaled", "T3_Scaled", "RH_3_Scaled", 'RH_7_Scaled', "T9_Scaled",'Windspeed_Scaled', "dayofyear_Scaled", "hour_Scaled")
train.display()

## Change Parameters and Build New Model

In [0]:
gbt = GBTRegressor(labelCol="Appliances", stepSize=0.15, subsamplingRate=1.0, impurity="variance", featureSubsetStrategy="all", validationTol=0.01)

# Parmeter experimentation
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2,20,2]) \
    .addGrid(gbt.maxIter, [20,30]) \
    .build()

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid,
                          evaluator = RegressionEvaluator(metricName="rmse", labelCol="Appliances", predictionCol="prediction"),
                          numFolds=10)

In [0]:
feature_cols = train.columns
feature_cols.remove("Appliances")
vectorAssembler3 = VectorAssembler(inputCols=feature_cols,outputCol="features")
pipeline3 = Pipeline(stages=[vectorAssembler3, cv])
pipelineModel3 = pipeline3.fit(train)

In [0]:
prediction_train2 = pipelineModel3.transform(train)
rmse_train2 = evaluator.evaluate(prediction_train2)

In [0]:
print(rmse)
print(rmse2)
print(rmse_train2)

In [0]:
prediction3 = pipelineModel3.transform(test)
rmse3 = evaluator.evaluate(prediction3)

In [0]:
print(rmse)
print(rmse_train)
print(rmse2)
print(rmse_train2)
print(rmse3)

In [0]:
cvModel3 = pipelineModel3.stages[1]
print(cvModel3.bestModel)
pairs = cvModel3.bestModel.featureImportances
to_remove = []

for i in range(len(pairs)):
  if pairs[i]<0.05:
    to_remove.append(i)
print(to_remove)
#train.display()

In [0]:
train.display()

## New Model - Linear Regression

In [0]:
train, test = old_df.randomSplit([0.7, 0.3])

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

feature_cols = train.columns
feature_cols.remove("Appliances")

vectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
lr = LinearRegression(labelCol="Appliances",)

# Parmeter experimentation
paramGrid = ParamGridBuilder() \
    .addGrid(lr.aggregationDepth, [2,3]) \
    .addGrid(lr.maxIter, [100,150,200]) \
    .addGrid(lr.regParam, [0.0,0.05,0.1,0.15]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="Appliances", predictionCol="prediction")

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid,
                          evaluator = RegressionEvaluator(metricName="rmse", labelCol="Appliances", predictionCol="prediction"),
                          numFolds=10)

stages = [vectorAssembler, standardScaler, cv]
pipeline = Pipeline(stages=stages)

In [0]:
pipelineModel_lr = pipeline.fit(train)

In [0]:
prediction_train_lr = pipelineModel_lr.transform(train)
rmse_train_lr = evaluator.evaluate(prediction_train_lr)

In [0]:
prediction_lr = pipelineModel_lr.transform(test)
rmse_lr = evaluator.evaluate(prediction_lr)

In [0]:
print(rmse)
print(rmse_train)
print(rmse2)
print(rmse_train_lr)
print(rmse_lr)