In [11]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#Import Spark CountVectorizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import CountVectorizer

from pyspark.sql.functions import col
from pyspark.sql.types import (StructField, DoubleType, StringType,
                               IntegerType, TimestampType,
                               FloatType, StructType)


from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor,GBTRegressor,RandomForestRegressor
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.regression import LinearRegression 

spark = SparkSession \
    .builder \
    .appName("mongodb") \
    .master("spark://master:7077") \
    .getOrCreate()

# Linear Regression 

## Importing and preparing the dataset 

- First we have imported the libraries and ML models we will be using 

In [12]:
data_schema = [StructField('LCLid', StringType(), True),
              StructField('stdorToU', StringType(), True),
              StructField('DateTime', TimestampType(), True),
              StructField('KWH/hh (per half hour)',FloatType(), True),
              StructField('Acorn', StringType(), True),
              StructField('Acorn_grouped', StringType(), True)]

final_struc = StructType(fields=data_schema)

- We can load data either from CSV in or Jupyter /work folder that is persistant or we can read the files from HDFS that was prepoulated during spark-submit

In [13]:
power= spark.read.format("csv").load("power.csv", schema=final_struc, header=True)

#power= spark.read.parquet("hdfs://hadoop:8020/employees51.parquet")


- We do some cleanup by first renaming the columns to more descriptive titles removing a row that has some odd values and then dropping NA

In [15]:
# from pyspark.sql.functions import col
# power= power.select(col("_c0").alias("LCLid"), col("_c1").alias("stdorToU"), col("_c4").alias("Acorn"), col("_c2").alias("DateTime"), col("_c3").alias("KWH/hh_per_half hour"), col("_c5").alias("Acorn_grouped"))
# power.show()

In [16]:
#power = power.withColumn("KWH/hh_per_half hour", power["KWH/hh_per_half hour"].cast(DoubleType()))


In [17]:
power=power.filter (power["LCLid"] != 'LCLid')

In [18]:
power = power.na.drop()

- Next we encode the columns that have string values and then fitting them to our dataset 

In [21]:
#categorical veriable encoder another way 


from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="LCLid", outputCol="LCL_cat")
indexer2 = StringIndexer(inputCol="Acorn_grouped", outputCol="Acorn_cat")
indexed = indexer.fit(power).transform(power)
indexed = indexer2.fit(indexed).transform(indexed)
indexed.show(5)

- Next we assemble the newly encoded features to a vector  then to save space we choose just the columns we will be using KWH for lable that we want to predict and features dense vector column and call our final dataset final_power 

In [None]:
assembler = VectorAssembler(
  inputCols=['LCL_cat','Acorn_cat'],
    outputCol="features")

In [None]:
output = assembler.transform(indexed)

In [None]:
output.select("features", "KWH/hh_per_half hour").show(500)

In [None]:
final_power=output.select("features", "KWH/hh_per_half hour")

In [None]:
final_power.printSchema()

- Next we split our dataset final_power to train and test sets 80% to train and 20% to test 

In [128]:
train_data,test_data =final_power.randomSplit([0.8,0.2])

- Next we apply Linear regression to our dataset and train our model 

In [130]:
lr = LinearRegression(labelCol='KWH/hh_per_half hour',featuresCol='features')

In [131]:
# Fit the model to the data and call this model lrModel
lrModel = lr.fit(train_data)

- Lets see our test resultes. We use evaluate function to create the test_results and in next cell use it to se waht is our Mean Squared Error 

In [22]:
 test_results = lrModel.evaluate(test_data)

In [133]:
test_results.meanAbsoluteError


0.2069325348776791

In [134]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))

RMSE: 0.38524109376479526
MSE: 0.14841070032509576


In [136]:
#only for HDFS
from pyspark.sql.types import DoubleType
final_power = final_power.withColumn("KWH/hh_per_half hour", final_power["KWH/hh_per_half hour"].cast(DoubleType()))

## Decision tree and Random Forest 

- We apply three models with default values to our dataset and the in next cell we train the 3 models ...it takes a while 

In [137]:
# Use mostly defaults to make this comparison "fair"

dtc = DecisionTreeRegressor(labelCol='KWH/hh_per_half hour',featuresCol='features')
rfc = RandomForestRegressor(labelCol='KWH/hh_per_half hour',featuresCol='features')
gbt = GBTRegressor(labelCol='KWH/hh_per_half hour',featuresCol='features')

In [138]:
# Train the models (its three models, so it might take some time)
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [139]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

- Now to interpret the results we import evaluation library  and in next sell specify our lable, name of our prediction field and metric

In [140]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [144]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="KWH/hh_per_half hour", predictionCol="prediction", metricName="accuracy")

In [145]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [146]:
print("Here are the results!")
print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
--------------------------------------------------------------------------------
A single decision tree had an accuracy of: 0.00%
--------------------------------------------------------------------------------
A random forest ensemble had an accuracy of: 0.00%
--------------------------------------------------------------------------------
A ensemble using GBT had an accuracy of: 0.00%


In [44]:
# from pyspark.sql.functions import col, split
# power = power.withColumn("Acorn_Array", split(col("Acorn_grouped")," "))

#power.show(1000)

In [45]:
#AcornVectorizer = CountVectorizer(inputCol="Acorn_Array", outputCol="Acorn_OneHotEncoded", vocabSize=4, minDF=1.0)

In [46]:
#AcornVectorizer_model = AcornVectorizer.fit(power)

In [47]:
#df_power = AcornVectorizer_model.transform(power)

In [48]:
#df_power.show()