In [1]:
#We import the data
dfraw = spark.read.  \
         option("header", "true"). \
         option("nullValue", "?"). \
         option("inferSchema", "true"). \
         option("sep", ";"). \
         csv("/FileStore/tables/powerDataForTP.csv") 

df = spark.read.  \
         option("header", "true"). \
         option("nullValue", "?"). \
         option("inferSchema", "true"). \
         option("sep", ";"). \
         csv("/FileStore/tables/powerDataForTP.csv") 


In [2]:
from pyspark.sql.functions import col, avg

####We compute the mean of column and replace the missing value with it
mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
dfmodified=df.fillna( col_avgs )

In [3]:
display(dfmodified)

In [4]:

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

vec= VectorAssembler(
  inputCols= [
  'AT',
  'V',
  'AP',
  'RH'
  ],
  outputCol = 'features'
)
data= vec.transform(dfmodified)
print(data)
modelData = data.select('features','PE')
trainData, testData = modelData.randomSplit([0.7,0.3])

#Visualization of the variables
modelData.describe().show()
trainData.describe().show()
testData.describe().show()

#Model Creation
lr= LinearRegression(labelCol='PE',featuresCol='features',regParam=0.01)

#We fit the model
lrModel=lr.fit(trainData)

#we create a summary and display it
summary=lrModel.summary
summary.predictions.show(truncate=False)


#Evaluator
evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")

#Estimator
lr1= LinearRegression(labelCol='PE',featuresCol='features')
lr2= LinearRegression(labelCol='PE',featuresCol='features')
lr3= LinearRegression(labelCol='PE',featuresCol='features')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid1 = ParamGridBuilder().\
    addGrid(lr1.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr1.elasticNetParam, [0]).\
    build()
param_grid2 = ParamGridBuilder().\
    addGrid(lr2.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr2.elasticNetParam, [1]).\
    build()
param_grid3 = ParamGridBuilder().\
    addGrid(lr2.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr2.elasticNetParam, [0.2, 0.4,0.6, 0.8]).\
    build()
# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=param_grid1, evaluator=evaluator, numFolds=4)
cv2 = CrossValidator(estimator=lr2, estimatorParamMaps=param_grid2, evaluator=evaluator, numFolds=4)
cv3 = CrossValidator(estimator=lr3, estimatorParamMaps=param_grid3, evaluator=evaluator, numFolds=4)

#Fit cross-validation model
cv_model1 = cv1.fit(trainData)
cv_model2 = cv2.fit(trainData)
cv_model3 = cv3.fit(trainData)

#Prediction
pred_training_cv1 = cv_model1.transform(trainData)
pred_test_cv1 = cv_model1.transform(testData)

pred_training_cv2 = cv_model2.transform(trainData)
pred_test_cv2 = cv_model2.transform(testData)

pred_training_cv3 = cv_model3.transform(trainData)
pred_test_cv3 = cv_model3.transform(testData)

#Evaluation
# performance on training data
print("Train data performance Lasso = ", evaluator.evaluate(pred_training_cv1))
print("Train data performance Ridge = ", evaluator.evaluate(pred_training_cv2))
print("Train data performance Elastic = ", evaluator.evaluate(pred_training_cv3))

# performance on test data
print("Test data performance Lasso = ", evaluator.evaluate(pred_test_cv1))
print("Test data performance Ridge = ", evaluator.evaluate(pred_test_cv2))
print("Test data performance Elastic = ", evaluator.evaluate(pred_test_cv3))

#Intercept and coefficients
print("Best intercept Lasso = ", cv_model1.bestModel.intercept)
#print("Best coefficients Lasso = ", cv_model1.bestModel.coefficients)

print("Best intercept Ridge = ", cv_model2.bestModel.intercept)
#print("Best coefficients Ridge = ", cv_model2.bestModel.coefficients)

print("Best intercept Elastic = ", cv_model3.bestModel.intercept)
#print("Best coefficients Elastic = ", cv_model3.bestModel.coefficients)


In [5]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.regression import DecisionTreeRegressor,RandomForestRegressor
from pyspark.ml.classification import DecisionTreeClassifier

trainData2, testData2 = dfmodified.randomSplit([0.7,0.3])

dt = DecisionTreeRegressor(maxDepth=3)
rf = RandomForestRegressor(numTrees=3, maxDepth=3)

dt.setLabelCol("PE")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")

rf.setLabelCol("PE")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")
  
#rf = RandomForestRegressor(labelCol="PE", featuresCol="features", predictionCol("prediction"))

# We create the Pipeline
dtPipeline = Pipeline()
rfPipeline = Pipeline()

# We set the parameters of the pipeline
dtPipeline.setStages([vec, dt])
rfPipeline.setStages([vec, rf])

eval = RegressionEvaluator(predictionCol="prediction", labelCol="PE", metricName="rmse")
crossval = CrossValidator(estimator=dtPipeline, evaluator=eval, numFolds=4)
crossval2 = CrossValidator(estimator=rfPipeline, evaluator=eval, numFolds=4)
crossval.setEstimator(dtPipeline)
crossval2.setEstimator(rfPipeline)

# We try different maxDepth
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,3,4,5])
             .build())

paramGrid2 = (ParamGridBuilder()
             .addGrid(rf.maxBins, [20,40,60,80,100])
             .build())

# We add the grid
crossval.setEstimatorParamMaps(paramGrid)
crossval2.setEstimatorParamMaps(paramGrid2)

# We generate the best model
dtModel = crossval.fit(trainData2).bestModel
rfModel = crossval2.fit(trainData2).bestModel

#Prediction
pred_training_cv3 = dtModel.transform(trainData2)
pred_test_cv3 = dtModel.transform(testData2)

pred_training_cv4 = rfModel.transform(trainData2)
pred_test_cv4 = rfModel.transform(testData2)

# performance on training data
print("Train data performance Decision Tree = ", evaluator.evaluate(pred_training_cv3))
print("Train data performance Random Forest = ", evaluator.evaluate(pred_training_cv4))

# performance on test data
print("Test data performance Decision Tree = ", evaluator.evaluate(pred_test_cv3))
print("Test data performance Random Forest = ", evaluator.evaluate(pred_test_cv4))
