# Big Data Intelligence: Methods and Technologies

# Assignment 3: PySpark Assignment
## Authors:  Liana Mehrabyan and Elsa Scola Martín
### Objective:
Given a dataset from the Kaggle competition ”[AMS 2013-2014 Solar Energy Prediction Contest](https://www.kaggle.com/c/ams-2014-solar-energy-prediction-contest/)” the goal of this assignment is to use  meteorological  variables  forecasted  by GFS as input  attributes  to  a machine learning model that is able to estimate how much solar energy is going to be produced at one of the solar plants in Oklahoma.

To solve this problem we focus in using PySpark in Databricks along with some techinques seen in class, like Pipelines.


### What is done in the Notebook: 
- Load the data.
- Split the train data in train and validation.
- Structure the data.
- Explore k values.
- Evaluate the results by plotting MAE for each k components.
- Put together train and validation sets.
- Final Linear Regressor with k=4.
- Perform Linear Regression on all the 1200 PCA components.

### Import the libraries

In [3]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib
from pyspark.ml.feature import VectorAssembler

### Loading the data

In [5]:
train_sd = spark.read.csv(path='/FileStore/tables/trainst1ns16.csv', header=True, inferSchema=True)
test_sd = spark.read.csv(path='/FileStore/tables/testst1ns16.csv', header=True, inferSchema=True)

### Divide train partition in train and validation

In [7]:
validation_sd  = train_sd.filter(train_sd['counter']>=3650)
train_sd = train_sd.filter(train_sd['counter']<3650)
train_sd = train_sd.drop('counter')
validation_sd = validation_sd.drop('counter')

### Structure the data

In [9]:
ignore = ['energy']

assembler = VectorAssembler(
    inputCols=[x for x in train_sd.columns if x not in ignore],
    outputCol='features')

train_sd_new = assembler.transform(train_sd).select(['energy', 'features'])
validation_sd_new = assembler.transform(validation_sd).select(['energy', 'features'])
test_df = assembler.transform(test_sd).select(['energy', 'features'])

### Explore k values

In [11]:
results_mae = []
for i in range (1,21):
  scaler1 = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=False, withMean=True)
  pca1 = PCA(k=i, inputCol="scaledFeatures")
  lr = LinearRegression(featuresCol = pca1.getOutputCol(), labelCol='energy')
  pipeline1 = Pipeline(stages=[scaler1, pca1, lr])
  
  model1 = pipeline1.fit(train_sd_new)
  predictions = model1.transform(validation_sd_new)

  evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
  mae1 = evaluator.evaluate(predictions)
  results_mae.append(mae1)

### Plot MAE for each k components

In [13]:
plt.figure()# k components
x = list(range(len(results_mae)))
# plotting the points  
plt.plot(x, results_mae) 
plt.xlabel('k components') 
plt.ylabel('MAE') 
plt.title('MAE value for each k components') 
fig=plt.subplot()
display(fig)

As it can be seen in the plot the optimal number of components is around 4.

### Put together train and validation sets

In [16]:
train_df = train_sd_new.union(validation_sd_new)
print((train_df.count(), len(train_df.columns)))

### Final linear regressor (k=4)

In [18]:
# Spark does not center the data. Let's do that with a StandardScaler:
scaler1 = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=False, withMean=True)
pca1 = PCA(k=4, inputCol="scaledFeatures")

lr = LinearRegression(featuresCol = pca1.getOutputCol(), labelCol='energy')

pipeline1 = Pipeline(stages=[scaler1, pca1, lr])

model1 = pipeline1.fit(train_df)
predictions = model1.transform(test_df)

evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
mae1 = evaluator.evaluate(predictions)

In [19]:
print(mae1)

### Linear Regression on all the 1200 PCA components

In [21]:

scaler1 = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=False, withMean=True)
pca1 = PCA(k=1200, inputCol="scaledFeatures")

lr = LinearRegression(featuresCol = pca1.getOutputCol(), labelCol='energy')

pipeline1 = Pipeline(stages=[scaler1, pca1, lr])

model1 = pipeline1.fit(train_df)
predictions = model1.transform(test_df)

evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
mae1 = evaluator.evaluate(predictions)

In [22]:
print(mae1)

As it can be seen we get a higher error by using all of the principal components. This shows that there is some redundancy in the input attributes that can be removed via PCA.