In [87]:
!pip install pyspark



**Step-1Import Library and Start Spark Session**

In [88]:
import pyspark

In [89]:
#Start the Spark Session 
from pyspark.sql import SparkSession
spark1 = SparkSession.builder.appName('Basics').getOrCreate()

In [90]:
#Check is it assign 
spark1

**Step-2 Load The Dataset**

In [91]:
#Load Dataset
data=spark1.read.option('header','true').csv('/content/complete_plant.csv').cache()

In [92]:
data

DataFrame[AT: string, V: string, AP: string, RH: string, EP: string]

In [93]:
data.show()

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    EP|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
|26.27|59.44|1012.23|58.77|443.67|
|15.89|43.96|1014.02|75.24|467.35|
| 9.48|44.71|1019.12|66.43|478.42|
|14.64|   45|1021.78|41.25|475.98|
|11.74|43.56|1015.14|70.72| 477.5|
|17.99|43.72|1008.64|75.04|453.02|
|20.14|46.93|1014.66|64.22|453.99|
|24.34| 73.5|1011.31|84.15|440.29|
|25.71|58.59|1012.77|61.83|451.28|
|26.19|69.34|1009.48|87.59|433.99|
|21.42|43.79|1015.76|43.08|462.19|
|18.21|   45|1022.86|48.84|467.54|
|11.04|41.74| 1022.6|77.51| 477.2|
|14.45|52.75|1023.97|63.59|459.85|
|13.97|38.47|1015.15|55.28| 464.3|
+-----+-----+-------+-----+------+
only showing top 20 rows



In [94]:
#Check the Datatype 
data.dtypes

[('AT', 'string'),
 ('V', 'string'),
 ('AP', 'string'),
 ('RH', 'string'),
 ('EP', 'string')]

In [95]:
#Show schema of movies dataframe
data.printSchema()

root
 |-- AT: string (nullable = true)
 |-- V: string (nullable = true)
 |-- AP: string (nullable = true)
 |-- RH: string (nullable = true)
 |-- EP: string (nullable = true)



In [96]:
data.columns

['AT', 'V', 'AP', 'RH', 'EP']

In [97]:
#Describe 
data.describe().show()

+-------+-----------------+------------------+------------------+-----------------+------------------+
|summary|               AT|                 V|                AP|               RH|                EP|
+-------+-----------------+------------------+------------------+-----------------+------------------+
|  count|            47840|             47840|             47840|            47840|             47840|
|   mean|19.65144021739117| 54.30580372073524|1013.2590781772577| 73.3089778428094|454.36500919732975|
| stddev| 7.45201547800603|12.707361709685806| 5.938535418520848|14.59965835208147| 17.06628146634786|
|    min|             1.81|             25.36|              1000|              100|            420.26|
|    max|             9.99|             81.56|            999.99|            99.97|            495.76|
+-------+-----------------+------------------+------------------+-----------------+------------------+



In [98]:
data.summary().show()

+-------+-----------------+------------------+------------------+-----------------+------------------+
|summary|               AT|                 V|                AP|               RH|                EP|
+-------+-----------------+------------------+------------------+-----------------+------------------+
|  count|            47840|             47840|             47840|            47840|             47840|
|   mean|19.65144021739117| 54.30580372073524|1013.2590781772577| 73.3089778428094|454.36500919732975|
| stddev| 7.45201547800603|12.707361709685806| 5.938535418520848|14.59965835208147| 17.06628146634786|
|    min|             1.81|             25.36|              1000|              100|            420.26|
|    25%|            13.51|             41.74|            1009.1|            63.32|            439.75|
|    50%|            20.34|             52.08|           1012.94|            74.97|            451.52|
|    75%|            25.72|             66.54|           1017.26|        

In [99]:
#These methods return some or all rows as a Python list.
data.take(2)

[Row(AT='14.96', V='41.76', AP='1024.07', RH='73.17', EP='463.26'),
 Row(AT='25.18', V='62.96', AP='1020.04', RH='59.08', EP='444.37')]

In [100]:
#Convert the Datatype from String To Int 
from pyspark.sql.types import IntegerType
data = data.withColumn("AT", data["AT"].cast(IntegerType()))
data = data.withColumn("V", data["V"].cast(IntegerType()))
data = data.withColumn("AP", data["AP"].cast(IntegerType()))
data = data.withColumn("RH", data["RH"].cast(IntegerType()))
data = data.withColumn("EP", data["EP"].cast(IntegerType()))

In [101]:
data.dtypes

[('AT', 'int'), ('V', 'int'), ('AP', 'int'), ('RH', 'int'), ('EP', 'int')]

**Step-3 Checek the NULL Values**

In [102]:
### Get count of null values in pyspark
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+---+---+---+---+---+
| AT|  V| AP| RH| EP|
+---+---+---+---+---+
|  0|  0|  0|  0|  0|
+---+---+---+---+---+



In [103]:
### Get count of nan or missing values in pyspark
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()

+---+---+---+---+---+
| AT|  V| AP| RH| EP|
+---+---+---+---+---+
|  0|  0|  0|  0|  0|
+---+---+---+---+---+



In [104]:
### Get count of both null and missing values in pyspark
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+---+---+---+---+---+
| AT|  V| AP| RH| EP|
+---+---+---+---+---+
|  0|  0|  0|  0|  0|
+---+---+---+---+---+



**Step-4 Verctorize the features**

In [105]:
from pyspark.ml.feature import *

In [106]:
#Use Vectorize Assembler to convert the indepenedent column to independent Feature 
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['AT', 'V', 'AP', 'RH'],outputCol="Independent Feature")

In [107]:
output=featureassembler.transform(data)

In [108]:
output.show()

+---+---+----+---+---+--------------------+
| AT|  V|  AP| RH| EP| Independent Feature|
+---+---+----+---+---+--------------------+
| 14| 41|1024| 73|463|[14.0,41.0,1024.0...|
| 25| 62|1020| 59|444|[25.0,62.0,1020.0...|
|  5| 39|1012| 92|488|[5.0,39.0,1012.0,...|
| 20| 57|1010| 76|446|[20.0,57.0,1010.0...|
| 10| 37|1009| 96|473|[10.0,37.0,1009.0...|
| 26| 59|1012| 58|443|[26.0,59.0,1012.0...|
| 15| 43|1014| 75|467|[15.0,43.0,1014.0...|
|  9| 44|1019| 66|478|[9.0,44.0,1019.0,...|
| 14| 45|1021| 41|475|[14.0,45.0,1021.0...|
| 11| 43|1015| 70|477|[11.0,43.0,1015.0...|
| 17| 43|1008| 75|453|[17.0,43.0,1008.0...|
| 20| 46|1014| 64|453|[20.0,46.0,1014.0...|
| 24| 73|1011| 84|440|[24.0,73.0,1011.0...|
| 25| 58|1012| 61|451|[25.0,58.0,1012.0...|
| 26| 69|1009| 87|433|[26.0,69.0,1009.0...|
| 21| 43|1015| 43|462|[21.0,43.0,1015.0...|
| 18| 45|1022| 48|467|[18.0,45.0,1022.0...|
| 11| 41|1022| 77|477|[11.0,41.0,1022.0...|
| 14| 52|1023| 63|459|[14.0,52.0,1023.0...|
| 13| 38|1015| 55|464|[13.0,38.0

In [109]:
print(featureassembler.explainParams())

handleInvalid: How to handle invalid data (NULL and NaN values). Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the output). Column lengths are taken from the size of ML Attribute Group, which can be set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'). (default: error)
inputCols: input column names. (current: ['AT', 'V', 'AP', 'RH'])
outputCol: output column name. (default: VectorAssembler_89c8c020a26c__output, current: Independent Feature)


In [110]:
finaldata=output.select('Independent Feature','EP')

In [111]:
#Final data after vectorize assembler
finaldata.show()

+--------------------+---+
| Independent Feature| EP|
+--------------------+---+
|[14.0,41.0,1024.0...|463|
|[25.0,62.0,1020.0...|444|
|[5.0,39.0,1012.0,...|488|
|[20.0,57.0,1010.0...|446|
|[10.0,37.0,1009.0...|473|
|[26.0,59.0,1012.0...|443|
|[15.0,43.0,1014.0...|467|
|[9.0,44.0,1019.0,...|478|
|[14.0,45.0,1021.0...|475|
|[11.0,43.0,1015.0...|477|
|[17.0,43.0,1008.0...|453|
|[20.0,46.0,1014.0...|453|
|[24.0,73.0,1011.0...|440|
|[25.0,58.0,1012.0...|451|
|[26.0,69.0,1009.0...|433|
|[21.0,43.0,1015.0...|462|
|[18.0,45.0,1022.0...|467|
|[11.0,41.0,1022.0...|477|
|[14.0,52.0,1023.0...|459|
|[13.0,38.0,1015.0...|464|
+--------------------+---+
only showing top 20 rows



**Step-5 Fit Linear Regression Model**

In [112]:
#Use Linear Regression 
from pyspark.ml.regression import LinearRegression
train_data,test_data=finaldata.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol="Independent Feature",labelCol='EP')
regressor=regressor.fit(train_data)

In [113]:
type(regressor)

pyspark.ml.regression.LinearRegressionModel

**Step-6 View model summary**

In [114]:
#R2 Score 
print("R2:", regressor.summary.r2)
print("Intercept: ", regressor.intercept, "Coefficients", regressor.coefficients)

R2: 0.9273227793762697
Intercept:  446.68289902375756 Coefficients [-1.9593709705583748,-0.24044851439115086,0.06808856328819865,-0.1551897262064853]


**Step-7 Predict the Value**

In [115]:
#Predict the value 
df_pred = regressor.transform(finaldata)
df_pred.show()

+--------------------+---+------------------+
| Independent Feature| EP|        prediction|
+--------------------+---+------------------+
|[14.0,41.0,1024.0...|463| 467.7871551399451|
|[25.0,62.0,1020.0...|444| 443.0849575753268|
|[5.0,39.0,1012.0,...|488| 482.1367233463712|
|[20.0,57.0,1010.0...|446| 450.7649440216822|
|[10.0,37.0,1009.0...|473| 471.9957409276711|
|[26.0,59.0,1012.0...|443| 441.4574133678428|
|[15.0,43.0,1014.0...|467| 464.3556220553095|
|[9.0,44.0,1019.0,...|478|477.60854971656795|
|[14.0,45.0,1021.0...|475|471.58716663112347|
|[11.0,43.0,1015.0...|477| 473.0371431318636|
|[17.0,43.0,1008.0...|453| 460.0283487344635|
|[20.0,46.0,1014.0...|453| 455.5445086476155|
|[24.0,73.0,1011.0...|440| 437.9068546628266|
|[25.0,58.0,1012.0...|451|443.19166367417284|
|[26.0,69.0,1009.0...|433| 434.3481604740786|
|[21.0,43.0,1015.0...|462|  457.633556033855|
|[18.0,45.0,1022.0...|467|462.73144322873276|
|[11.0,41.0,1022.0...|477| 472.9083320202179|
|[14.0,52.0,1023.0...|459| 466.626

**Step-8 Evaluate It**

In [116]:
from pyspark.ml.evaluation import RegressionEvaluator

In [117]:
#Evalute the model using regressor evaluator 
evaluator = RegressionEvaluator()
print(evaluator.explainParams())

labelCol: label column name. (default: label)
metricName: metric name in evaluation - one of:
                       rmse - root mean squared error (default)
                       mse - mean squared error
                       r2 - r^2 metric
                       mae - mean absolute error
                       var - explained variance. (default: rmse)
predictionCol: prediction column name. (default: prediction)
throughOrigin: whether the regression is through the origin. (default: False)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)


In [118]:
evaluator = RegressionEvaluator(labelCol = "EP", predictionCol = "prediction", metricName = "rmse")
evaluator.evaluate(df_pred)

4.601174400411695

**Step-9  Build a pipeline**

In [119]:
from pyspark.ml.pipeline import Pipeline, PipelineModel

In [120]:
pipeline = Pipeline()
print(pipeline.explainParams())
pipeline.setStages([featureassembler, regressor])
pipelineModel = pipeline.fit(finaldata)


stages: a list of pipeline stages (undefined)


In [121]:
pipeline.getStages()

[VectorAssembler_89c8c020a26c,
 LinearRegressionModel: uid=LinearRegression_53515e35339d, numFeatures=4]

In [122]:
lr_model = pipelineModel.stages[1]
lr_model .coefficients

DenseVector([-1.9594, -0.2404, 0.0681, -0.1552])

In [123]:
pipelineModel.transform(data).show()

+---+---+----+---+---+--------------------+------------------+
| AT|  V|  AP| RH| EP| Independent Feature|        prediction|
+---+---+----+---+---+--------------------+------------------+
| 14| 41|1024| 73|463|[14.0,41.0,1024.0...| 467.7871551399451|
| 25| 62|1020| 59|444|[25.0,62.0,1020.0...| 443.0849575753268|
|  5| 39|1012| 92|488|[5.0,39.0,1012.0,...| 482.1367233463712|
| 20| 57|1010| 76|446|[20.0,57.0,1010.0...| 450.7649440216822|
| 10| 37|1009| 96|473|[10.0,37.0,1009.0...| 471.9957409276711|
| 26| 59|1012| 58|443|[26.0,59.0,1012.0...| 441.4574133678428|
| 15| 43|1014| 75|467|[15.0,43.0,1014.0...| 464.3556220553095|
|  9| 44|1019| 66|478|[9.0,44.0,1019.0,...|477.60854971656795|
| 14| 45|1021| 41|475|[14.0,45.0,1021.0...|471.58716663112347|
| 11| 43|1015| 70|477|[11.0,43.0,1015.0...| 473.0371431318636|
| 17| 43|1008| 75|453|[17.0,43.0,1008.0...| 460.0283487344635|
| 20| 46|1014| 64|453|[20.0,46.0,1014.0...| 455.5445086476155|
| 24| 73|1011| 84|440|[24.0,73.0,1011.0...| 437.9068546

In [126]:
evaluator.evaluate(pipelineModel.transform(data))

4.601174400411695

**Step-10 Save The Pipeline**

In [127]:
#Save the Model 
pipelineModel.save("mode.pipeline")

In [128]:
!/content/model.pipeline

/bin/bash: /content/model.pipeline: Is a directory


In [129]:
saved_model = PipelineModel.load("model.pipeline")
saved_model.stages[1].coefficients

DenseVector([-1.9572, -0.2404, 0.0692, -0.1556])

In [130]:
saved_model.transform(data).show()

+---+---+----+---+---+--------------------+------------------+
| AT|  V|  AP| RH| EP| Independent Feature|        prediction|
+---+---+----+---+---+--------------------+------------------+
| 14| 41|1024| 73|463|[14.0,41.0,1024.0...|467.80901267624444|
| 25| 62|1020| 59|444|[25.0,62.0,1020.0...|443.13283898143794|
|  5| 39|1012| 92|488|[5.0,39.0,1012.0,...|482.11885083705505|
| 20| 57|1010| 76|446|[20.0,57.0,1010.0...| 450.7843722060337|
| 10| 37|1009| 96|473|[10.0,37.0,1009.0...| 471.9835950951943|
| 26| 59|1012| 58|443|[26.0,59.0,1012.0...| 441.4989958735198|
| 15| 43|1014| 75|467|[15.0,43.0,1014.0...| 464.3681737535729|
|  9| 44|1019| 66|478|[9.0,44.0,1019.0,...| 477.6172968124296|
| 14| 45|1021| 41|475|[14.0,45.0,1021.0...| 471.6186186864442|
| 11| 43|1015| 70|477|[11.0,43.0,1015.0...|473.04420556822123|
| 17| 43|1008| 75|453|[17.0,43.0,1008.0...| 460.0386954252518|
| 20| 46|1014| 64|453|[20.0,46.0,1014.0...| 455.5722414550841|
| 24| 73|1011| 84|440|[24.0,73.0,1011.0...| 437.9338008

In [131]:
df_train, df_test = data.randomSplit(weights=[0.7, 0.3], seed = 200)

In [132]:
pipelineModel = pipeline.fit(df_train)
evaluator.evaluate(pipelineModel.transform(df_test))

4.6074541991106335