<a href="https://colab.research.google.com/github/IrfanKhalid/BigData-Spark/blob/master/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
rawTextRdd = sc.textFile("/FileStore/tables/Folds5x2_pp.csv")
rawTextRdd.take(5)

In [0]:
from collections import namedtuple
PowerPlantRow=namedtuple("PowerPlantTable", ["AT", "V", "AP", "RH", "PE"])
powerPlant=rawTextRdd\
  .map(lambda x: x.split(","))\
  .filter(lambda line: line[0] != "AT")\
  .map(lambda line: PowerPlantRow(float(line[0]), float(line[1]), float(line[2]), float(line[3]), float(line[4])))
powerPlant.take(5)

In [0]:
# ANSWER
powerPlantDF = powerPlant.toDF()
powerPlantDF.registerTempTable("power_plant")

In [0]:
%sql
-- We can use %sql to query the rows
SELECT * FROM power_plant

AT,V,AP,RH,PE
8.34,40.77,1010.84,90.01,480.48
23.64,58.49,1011.4,74.2,445.75
29.74,56.9,1007.15,41.91,438.76
19.07,49.69,1007.22,76.79,453.09
11.8,40.66,1017.13,97.2,464.43
13.97,39.16,1016.05,84.6,470.96
22.1,71.29,1008.2,75.38,442.35
14.47,41.76,1021.98,78.41,464.0
31.25,69.51,1010.25,36.83,428.77
6.77,38.18,1017.8,81.13,484.31


In [0]:
%sql
desc power_plant

col_name,data_type,comment
AT,double,
V,double,
AP,double,
RH,double,
PE,double,


In [0]:
display(sqlContext.table("power_plant").describe())

summary,AT,V,AP,RH,PE
count,9568.0,9568.0,9568.0,9568.0,9568.0
mean,19.65123118729093,54.30580372073596,1013.2590781772544,73.30897784280923,454.365009406354
stddev,7.452473229611076,12.707892998326807,5.938783705811634,14.600268756728957,17.066994999803416
min,1.81,25.36,992.89,25.56,420.26
max,37.11,81.56,1033.3,100.16,495.76


In [0]:
%sql
-- ANSWER: Do a scatter plot of Power(PE) as a function of Temperature (AT).
-- BONUS: Name the y-axis "Power" and the x-axis "Temperature"
select AT as Temperature, PE as Power from power_plant

Temperature,Power
8.34,480.48
23.64,445.75
29.74,438.76
19.07,453.09
11.8,464.43
13.97,470.96
22.1,442.35
14.47,464.0
31.25,428.77
6.77,484.31


In [0]:
%sql
-- ANSWER: Do a scatter plot of Power(PE) as a function of ExhaustVacuum (V).
-- OPTIONAL: Name the y-axis "Power" and the x-axis "ExhaustVacuum".
select V as ExhaustVacuum, PE as Power from power_plant;

ExhaustVacuum,Power
40.77,480.48
58.49,445.75
56.9,438.76
49.69,453.09
40.66,464.43
39.16,470.96
71.29,442.35
41.76,464.0
69.51,428.77
38.18,484.31


In [0]:
%sql
-- ANSWER: Do a scatter plot of Power(PE) as a function of Pressure (AP).
-- OPTIONAL: Name the y-axis "Power" and the x-axis "Pressure"
select AP Pressure, PE as Power from power_plant;

Pressure,Power
1010.84,480.48
1011.4,445.75
1007.15,438.76
1007.22,453.09
1017.13,464.43
1016.05,470.96
1008.2,442.35
1021.98,464.0
1010.25,428.77
1017.8,484.31


In [0]:
%sql
-- ANSWER: Do a scatter plot of Power(PE) as a function of Humidity (RH).
-- OPTIONAL: Name the y-axis "Power" and the x-axis "Humidity"
select RH Humidity, PE Power from power_plant;

Humidity,Power
90.01,480.48
74.2,445.75
41.91,438.76
76.79,453.09
97.2,464.43
84.6,470.96
75.38,442.35
78.41,464.0
36.83,428.77
81.13,484.31


In [0]:
# ANSWER
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.table("power_plant")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")

In [0]:
# ANSWER
# First let's hold out 20% of our data for testing and leave 80% for training

(split20, split80) = dataset.randomSplit((0.20, 0.80), seed=1800009193L)
testSet = split20.cache()
trainingSet = split80.cache()

In [0]:
# ***** LINEAR REGRESSION MODEL ****

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml import Pipeline

# Let's initialize our linear regression learner
lr = LinearRegression()

In [0]:
# We use explain params to dump the parameters we can use
print(lr.explainParams())

In [0]:
# Now we set the parameters for the method
lr.setPredictionCol("Predicted_PE")\
  .setLabelCol("PE")\
  .setMaxIter(100)

# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline()
lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
lrModel = lrPipeline.fit(trainingSet)

In [0]:

def to_equation(model):
  # The intercept is as follows:
  intercept = lrModel.stages[1].intercept

  # The coefficents (i.e. weights) are as follows:
  weights = lrModel.stages[1].coefficients.toArray()
 # print weights

  featuresNoLabel = [col for col in dataset.columns if col != "PE"]
 # print featuresNoLabel, type(featuresNoLabel)
  coefficents = sc.parallelize(weights).zip(sc.parallelize(featuresNoLabel))
  #coefficents.take(4)
  
  #help(zip)
  equation = "y = {intercept}".format(intercept=intercept)
  variables = []

  # Now let's sort the coeffecients from the most to the least and append them to the equation.
  for x in coefficents.sortByKey().collect():
      weight = abs(x[0])
      name = x[1]
      symbol = "+" if (x[0] > 0) else "-"
      equation += (" {} ({} * {})".format(symbol, weight, name))

  # Finally here is our model expressed an equation
  return equation

print("Linear Regression Equation: " + to_equation(lrModel))

In [0]:
predictionsAndLabels = lrModel.transform(testSet)
display(predictionsAndLabels.select("AT", "V", "AP", "RH", "PE", "Predicted_PE"))

AT,V,AP,RH,PE,Predicted_PE
2.34,39.42,1028.47,69.68,490.34,493.629539930555
3.69,38.44,1016.74,82.87,490.78,488.3785259927835
3.94,39.9,1008.06,97.49,488.81,484.691474692394
3.98,35.47,1017.22,86.53,489.64,487.9346259847827
3.99,39.64,1011.53,83.58,492.06,487.0856508061317
4.11,38.44,1015.9,81.79,488.05,487.66982360835
4.62,38.44,1016.09,73.14,486.84,488.0563569524397
4.84,38.5,1011.96,81.62,491.23,486.0048597066192
4.97,40.64,1020.91,94.28,485.43,483.7475225106078
5.02,40.64,1021.2,93.27,485.28,483.8273128894411


In [0]:
# Now let's compute some evaluation metrics against our test dataset
from pyspark.mllib.evaluation import RegressionMetrics
metrics = RegressionMetrics(predictionsAndLabels.select("Predicted_PE", "PE").rdd.map(lambda r: (float(r[0]), float(r[1]))))

rmse = metrics.rootMeanSquaredError
explainedVariance = metrics.explainedVariance
r2 = metrics.r2

print("Root Mean Squared Error: {}".format(rmse))
print("Explained Variance: {}".format(explainedVariance))
print("R2: {}".format(r2))

In [0]:
# First we calculate the residual error and divide it by the RMSE
predictionsAndLabels.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE Residual_Error", "(PE - Predicted_PE) / {} Within_RSME".format(rmse)).registerTempTable("Power_Plant_RMSE_Evaluation")

In [0]:
%sql
SELECT * from Power_Plant_RMSE_Evaluation

PE,Predicted_PE,Residual_Error,Within_RSME
490.34,493.629539930555,-3.2895399305550086,-0.7145511538315741
490.78,488.3785259927835,2.40147400721645,0.5216462055420407
488.81,484.691474692394,4.118525307606035,0.8946226745263759
489.64,487.9346259847827,1.7053740152172736,0.3704399387188135
492.06,487.0856508061317,4.974349193868306,1.0805240340828022
488.05,487.66982360835,0.3801763916500249,0.0825816026094616
486.84,488.0563569524397,-1.216356952439753,-0.2642160551886781
491.23,486.0048597066192,5.225140293380832,1.1350006701202098
485.43,483.7475225106078,1.6824774893922267,0.3654663742409804
485.28,483.8273128894411,1.452687110558884,0.3155514974493628


In [0]:
%sql
-- Now we can display the RMSE as a Histogram. Clearly this shows that the RMSE is centered around 0 with the vast majority of the error within 2 RMSEs.
SELECT Within_RSME  from Power_Plant_RMSE_Evaluation

Within_RSME
-0.7145511538315741
0.5216462055420407
0.8946226745263759
0.3704399387188135
1.0805240340828022
0.0825816026094616
-0.2642160551886781
1.1350006701202098
0.3654663742409804
0.3155514974493628


In [0]:
%sql
SELECT case when Within_RSME <= 1.0 and Within_RSME >= -1.0 then 1  when  Within_RSME <= 2.0 and Within_RSME >= -2.0 then 2 else 3 end RSME_Multiple, COUNT(*) count  from Power_Plant_RMSE_Evaluation
group by case when Within_RSME <= 1.0 and Within_RSME >= -1.0 then 1  when  Within_RSME <= 2.0 and Within_RSME >= -2.0 then 2 else 3 end

RSME_Multiple,count
1,1336
3,52
2,534


In [0]:
# ANSWER
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Let's set up our evaluator class to judge the model based on the best root mean squared error
regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")

# Let's create our crossvalidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.01 to 0.10
regParam = [x / 100.0 for x in range(1, 11)]

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

# Now let's create the model
cvModel = crossval.fit(trainingSet)

In [0]:
predictionsAndLabels = cvModel.transform(testSet)
metrics = RegressionMetrics(predictionsAndLabels.select("Predicted_PE", "PE").rdd)

rmse = metrics.rootMeanSquaredError
explainedVariance = metrics.explainedVariance
r2 = metrics.r2

print "Root Mean Squared Error: {0}".format(rmse)
print "Explained Variance: {0}".format(explainedVariance)
print "R2: {0}".format(r2)

In [0]:
# ANSWER
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor()
dt.setLabelCol("PE")
dt.setPredictionCol("Predicted_PE")
dt.setFeaturesCol("features")
dt.setMaxBins(100)

dtPipeline = Pipeline()
dtPipeline.setStages([vectorizer, dt])

# Let's just reuse our CrossValidator
crossval.setEstimator(dtPipeline)

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 3])
             .build())
crossval.setEstimatorParamMaps(paramGrid)

dtModel = crossval.fit(trainingSet)

In [0]:
predictionsAndLabels = dtModel.bestModel.transform(testSet)
metrics = RegressionMetrics(predictionsAndLabels.select("Predicted_PE", "PE").rdd)

rmse = metrics.rootMeanSquaredError
explainedVariance = metrics.explainedVariance
r2 = metrics.r2

print "Root Mean Squared Error: {0}".format(rmse)
print "Explained Variance: {0}".format(explainedVariance)
print "R2: {0}".format(r2)

In [0]:
print dtModel.bestModel.stages[-1]._java_obj.toDebugString()

In [0]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor()
rf.setLabelCol("PE")
rf.setPredictionCol("Predicted_PE")
rf.setFeaturesCol("features")
rf.setSeed(100088121L)
rf.setMaxDepth(8)
rf.setNumTrees(30)

rfPipeline = Pipeline()
rfPipeline.setStages([vectorizer, rf])

# Let's just resuse our CrossValidator
crossval.setEstimator(rfPipeline)

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxBins, [50, 100])
             .build())
crossval.setEstimatorParamMaps(paramGrid)

rfModel = crossval.fit(trainingSet)

In [0]:
predictionsAndLabels = rfModel.transform(testSet)
metrics = RegressionMetrics(predictionsAndLabels.select("Predicted_PE", "PE").rdd)

rmse = metrics.rootMeanSquaredError
explainedVariance = metrics.explainedVariance
r2 = metrics.r2

print "Root Mean Squared Error: {0}".format(rmse)
print "Explained Variance: {0}".format(explainedVariance)
print "R2: {0}".format(r2)

In [0]:
print rfModel.bestModel.stages[-1]._java_obj.parent().explainParams()

In [0]:
print rfModel.bestModel.stages[-1]._java_obj.parent().getMaxBins()