** ALPER KOCABIYIK **

In [65]:
import sys
import os
import os.path


SPARK_HOME = """/Users/gulnur/spark-2.1.0-bin-hadoop2.7""" #CHANGE THIS PATH TO YOURS!

sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.4-src.zip")) #BEWARE WITH py4j version!!
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark.sql import SparkSession

spark.stop() 

spark = SparkSession \
    .builder \
    .appName("wind") \
    .config("spark.sql.caseSensitive", "false") \
    .getOrCreate()
  
sc = spark.sparkContext

In [None]:
# Import some libraries
import matplotlib.pyplot as plt 
# For plotting data
import numpy as np              
import os

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

os.getcwd()

In [67]:
wind_sd = spark.read.csv(path="wind.csv", header=True, inferSchema=True)

In [68]:
#wind_sd.printSchema()
#wind_sd.take(3)

In [69]:
# repartition distributes the dataframe into x partitions.
print("Number of partition before repartition: {}".format(wind_sd.rdd.getNumPartitions()))
wind_sd=wind_sd.repartition(4)
print("Number of partition after repartition: {}".format(wind_sd.rdd.getNumPartitions()))

Number of partition before repartition: 4
Number of partition after repartition: 4


**Dividing data for train, test and validation data sets **

In [70]:
Xtrain_sd = wind_sd.filter("year<=2006")
Xval_sd = wind_sd.filter((wind_sd['year']==2007) | (wind_sd['year']==2008))
XtrainVal_sd = Xtrain_sd.union(Xval_sd)
Xtest_sd = wind_sd.filter("year>=2009")
print Xtrain_sd.count()
print Xval_sd.count()
print XtrainVal_sd.count()
print Xtest_sd.count()

2528
1299
3827
2110


In [71]:
from pyspark.ml.feature import VectorAssembler

ignore = ['energy', 'steps', 'year', 'month', 'day', 'hour']

assembler = VectorAssembler(
    inputCols=[x for x in Xtrain_sd.columns if x not in ignore],
    outputCol='features')

Xtrain_sd = assembler.transform(Xtrain_sd).select(['energy', 'features'])

assembler = VectorAssembler(
    inputCols=[x for x in Xval_sd.columns if x not in ignore],
    outputCol='features')

Xval_sd = assembler.transform(Xval_sd).select(['energy', 'features'])

assembler = VectorAssembler(
    inputCols=[x for x in XtrainVal_sd.columns if x not in ignore],
    outputCol='features')

XtrainVal_sd = assembler.transform(XtrainVal_sd).select(['energy', 'features'])

assembler = VectorAssembler(
    inputCols=[x for x in Xtest_sd.columns if x not in ignore],
    outputCol='features')

Xtest_sd = assembler.transform(Xtest_sd).select(['energy', 'features'])

In [72]:
# change the column name 'energy' to 'label'
Xtrain_sd = Xtrain_sd.selectExpr("energy as label", "features as features")
Xval_sd = Xval_sd.selectExpr("energy as label", "features as features")
XtrainVal_sd = XtrainVal_sd.selectExpr("energy as label", "features as features")
Xtest_sd = Xtest_sd.selectExpr("energy as label", "features as features")

print(Xtrain_sd.printSchema())
print(Xval_sd.printSchema())
print(XtrainVal_sd.printSchema())
print(Xtest_sd.printSchema())

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

None
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

None
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

None
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

None


## 1 - Baseline results:

** Train a decision tree with train+validation datasets (XtrainVal_sd) and evaluate it
with the test dataset. The performance measure is MAE.**

In [74]:
dt = DecisionTreeRegressor()
model=dt.fit(XtrainVal_sd)

**Predictions are computed on the test set.** 

In [86]:
# test model
predictions_sd = model.transform(Xtest_sd)
predictions_sd.show()

# check error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions_sd)
print "Error for Decision Tree model : %d"  %mae

+-------+--------------------+--------------------------------+------------------+
|  label|            features|PCA_4cf38b628ce512accb1c__output|        prediction|
+-------+--------------------+--------------------------------+------------------+
|1191.99|[2450130.01534,24...|            [4.66284887452031...| 410.4751086956523|
| 916.83|[2496225.43889,24...|            [1.50038343889636...|1408.7215853658538|
|    2.9|[2514298.53184,25...|            [3766248.55510174...| 304.9938283828383|
|  73.69|[2499306.90124,24...|            [1.14180887567692...|  945.900981308411|
|1276.97|[2468415.46712,24...|            [6698163.84459003...|  311.455965665236|
| 786.37|[2435648.94961,24...|            [4901080.40461038...|1108.1148979591835|
| 151.61|[2454996.19561,24...|            [8183811.76678010...|  630.459054054054|
|1351.63|[2465849.08793,24...|            [6284468.07125426...|  630.459054054054|
| 943.94|[2462993.53924,24...|            [1564626.61208993...|124.55350364963505|
| 22

## 2/3. For different k values (k = number of PCA components to use)
### a. Compute k PCA components from the training set

In [106]:
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline

# Pipeline components from 10 to '10*limit' => decision tree 
pllist = []
limit = 10
for kval in range(10,limit*10+1 ,10):
    pca = PCA(k=kval, inputCol="features")
    dt = DecisionTreeRegressor(featuresCol=pca.getOutputCol(), labelCol="label")
    pl = Pipeline(stages=[pca, dt])
    pllist.append(pl)

<bound method Pipeline.getStages of Pipeline_433d946954114661d510>
<bound method Pipeline.getStages of Pipeline_4335b5a20abee503be3c>
<bound method Pipeline.getStages of Pipeline_4ef5955999360e13999c>
<bound method Pipeline.getStages of Pipeline_4cc8a81d098449a7f5e1>
<bound method Pipeline.getStages of Pipeline_413c82b186478320c72b>
<bound method Pipeline.getStages of Pipeline_4e06942dc77e28d7b3fd>
<bound method Pipeline.getStages of Pipeline_4a57a94b9e6a5afe08b8>
<bound method Pipeline.getStages of Pipeline_4d6e8d3646e586e63189>
<bound method Pipeline.getStages of Pipeline_41f0a5dda2b93f8902f8>
<bound method Pipeline.getStages of Pipeline_4fd19162672fad07b45e>


### b. Train a decision tree with them

In [88]:
modellist =[]
for i in range(0,limit,1):
    model = pllist[i].fit(Xtrain_sd)
    modellist.append(model)

### c. Evaluate the decision tree on the validation set. MAE will be used as performance measure. 

In [96]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
predictions = []
mae = []
for i in range(0,limit,1):
    # test model
    predictions.append(modellist[i].transform(Xval_sd))
    #predictions[i].show()

    # check error
    mae.append(evaluator.evaluate(predictions[i]))
    print "Error for PCA model with %d principal components: %d" %((i+1)*10, mae[i])

Error for PCA model with 10 principal components: 470
Error for PCA model with 20 principal components: 474
Error for PCA model with 30 principal components: 474
Error for PCA model with 40 principal components: 447
Error for PCA model with 50 principal components: 448
Error for PCA model with 60 principal components: 448
Error for PCA model with 70 principal components: 445
Error for PCA model with 80 principal components: 446
Error for PCA model with 90 principal components: 449
Error for PCA model with 100 principal components: 444


## 4. Training the final decisiontree, and evaluating it on the test set, using the best PCA result

In [94]:
# Best PCA pipeline model
# Training using train+validation dataset : XtrainVal_sd
bestPCAindex = mae.index(min(mae))
model_bestPCA = pllist[bestPCAindex].fit(XtrainVal_sd)

# test model
predictions_bestPCA = model_bestPCA.transform(Xtest_sd)
#predictions_sd.show()

# check error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions_bestPCA)
print "Error for best PCA model with %d components: %d" %((bestPCAindex+1)*10, mae)

Error for best PCA model with 100 components: 464


Baseline model MAE:482

Best PCA model MAE:464

We get a lower MAE with PCA, by using less number of attributes. That is, there is alot of redundancy in the input attributes that can be removed via PCA.

In [261]:
spark.stop()