In [1]:
import findspark
findspark.init('/usr/local/spark/')
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Python Linear Regression example").getOrCreate()

In [4]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [6]:
data = spark.read.load("/home/hduser/Downloads/sharedfolder/linregdata1.csv", format="csv", sep=",", inferSchema="true", header="true")
data.printSchema()

root
 |-- temperature: double (nullable = true)
 |-- exhaust_vacuum: double (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- energy_output: double (nullable = true)



In [7]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|       temperature|    exhaust_vacuum|  ambient_pressure| relative_humidity|    energy_output|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|              9568|              9568|              9568|              9568|             9568|
|   mean|19.651231187291014|54.305803720735966|1013.2590781772483| 73.30897784280928|454.3650094063547|
| stddev| 7.452473229611075|12.707892998326807| 5.938783705811638|14.600268756728957|17.06699499980342|
|    min|              1.81|             25.36|            992.89|             25.56|           420.26|
|    max|             37.11|             81.56|            1033.3|            100.16|           495.76|
+-------+------------------+------------------+------------------+------------------+-----------------+



In [8]:
features = ["temperature", "exhaust_vacuum", "ambient_pressure", "relative_humidity"]

In [9]:
lr_data = data.select(col("energy_output").alias("label"), *features)
lr_data.printSchema()

root
 |-- label: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- exhaust_vacuum: double (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- relative_humidity: double (nullable = true)



In [10]:
lr_data.show()

+------+-----------+--------------+----------------+-----------------+
| label|temperature|exhaust_vacuum|ambient_pressure|relative_humidity|
+------+-----------+--------------+----------------+-----------------+
|480.48|       8.34|         40.77|         1010.84|            90.01|
|445.75|      23.64|         58.49|          1011.4|             74.2|
|438.76|      29.74|          56.9|         1007.15|            41.91|
|453.09|      19.07|         49.69|         1007.22|            76.79|
|464.43|       11.8|         40.66|         1017.13|             97.2|
|470.96|      13.97|         39.16|         1016.05|             84.6|
|442.35|       22.1|         71.29|          1008.2|            75.38|
| 464.0|      14.47|         41.76|         1021.98|            78.41|
|428.77|      31.25|         69.51|         1010.25|            36.83|
|484.31|       6.77|         38.18|          1017.8|            81.13|
|435.29|      28.28|         68.67|         1006.36|             69.9|
|451.4

VectorAssembler is a transformer that combines a given list of columns into a single vector column.

In [11]:
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")

In [12]:
va_data = vectorAssembler.transform(lr_data)

In [13]:
va_data.show(truncate=False)

+------+-----------+--------------+----------------+-----------------+---------------------------+
|label |temperature|exhaust_vacuum|ambient_pressure|relative_humidity|unscaled_features          |
+------+-----------+--------------+----------------+-----------------+---------------------------+
|480.48|8.34       |40.77         |1010.84         |90.01            |[8.34,40.77,1010.84,90.01] |
|445.75|23.64      |58.49         |1011.4          |74.2             |[23.64,58.49,1011.4,74.2]  |
|438.76|29.74      |56.9          |1007.15         |41.91            |[29.74,56.9,1007.15,41.91] |
|453.09|19.07      |49.69         |1007.22         |76.79            |[19.07,49.69,1007.22,76.79]|
|464.43|11.8       |40.66         |1017.13         |97.2             |[11.8,40.66,1017.13,97.2]  |
|470.96|13.97      |39.16         |1016.05         |84.6             |[13.97,39.16,1016.05,84.6] |
|442.35|22.1       |71.29         |1008.2          |75.38            |[22.1,71.29,1008.2,75.38]  |
|464.0 |14

StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation or zero mean.
Uses 'withStd' by default i.e. scales the data to unit standard deviation.

In [14]:
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

In [15]:
ss_model = standardScaler.fit(va_data)

In [16]:
ss_data = ss_model.transform(va_data)

In [17]:
ss_data.show(truncate=False)

+------+-----------+--------------+----------------+-----------------+---------------------------+-----------------------------------------------------------------------------+
|label |temperature|exhaust_vacuum|ambient_pressure|relative_humidity|unscaled_features          |features                                                                     |
+------+-----------+--------------+----------------+-----------------+---------------------------+-----------------------------------------------------------------------------+
|480.48|8.34       |40.77         |1010.84         |90.01            |[8.34,40.77,1010.84,90.01] |[1.1190915744403476,3.208242310929751,170.20993692880273,6.164955008688884]  |
|445.75|23.64      |58.49         |1011.4          |74.2             |[23.64,58.49,1011.4,74.2]  |[3.1721012973345104,4.602651281978933,170.30423233131958,5.082098229582438]  |
|438.76|29.74      |56.9          |1007.15         |41.91            |[29.74,56.9,1007.15,41.91] |[3.99062151365179

In [18]:
(training, test) = ss_data.randomSplit([.7, .3])

In [19]:
training.describe().show()

+-------+------------------+-----------------+------------------+-----------------+------------------+
|summary|             label|      temperature|    exhaust_vacuum| ambient_pressure| relative_humidity|
+-------+------------------+-----------------+------------------+-----------------+------------------+
|  count|              6726|             6726|              6726|             6726|              6726|
|   mean| 454.2914674397851|19.65789771037765| 54.30651352958652|1013.255960451979| 73.30990336009505|
| stddev|17.125013519182264|7.468379334537977|12.757515979574965|5.926938415466322|14.621178631375274|
|    min|            420.26|             1.81|             25.36|           992.89|             25.89|
|    max|            495.76|            35.77|             81.56|          1033.29|            100.16|
+-------+------------------+-----------------+------------------+-----------------+------------------+



In [20]:
test.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|             label|       temperature|    exhaust_vacuum|  ambient_pressure|relative_humidity|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|              2842|              2842|              2842|              2842|             2842|
|   mean| 454.5390570021112|19.635453905700206| 54.30412385643922|1013.2664567206191|73.30678747361011|
| stddev|16.930620469850304|  7.41597979879169|12.591906946329521| 5.967762981931267|14.55323060123222|
|    min|            425.14|              2.34|             25.88|            993.31|            25.56|
|    max|            494.91|             37.11|             80.18|            1033.3|           100.13|
+-------+------------------+------------------+------------------+------------------+-----------------+



In [21]:
lr = LinearRegression(maxIter=10, regParam=.01)

In [22]:
lr_model = lr.fit(training)

Now that the linear regression model is built we can apply it on the test data using transform method.
Before that we can look at the characteristics of our model i.e. coefficients and other parameters.

In [23]:
lr_model.coefficients

DenseVector([-14.6918, -3.041, 0.349, -2.3524])

In [24]:
lr_model.intercept

458.30690761619087

In [25]:
trainingSummary = lr_model.summary

In [26]:
trainingSummary.rootMeanSquaredError

4.65550060758686

In [27]:
trainingSummary.meanAbsoluteError

3.6834050251856625

In [28]:
trainingSummary.meanSquaredError

21.673685907241623

In [29]:
trainingSummary.r2

0.9260845026248437

In [30]:
prediction_df = lr_model.transform(test)

In [31]:
prediction_df.show(truncate=False)

+------+-----------+--------------+----------------+-----------------+---------------------------+----------------------------------------------------------------------------+------------------+
|label |temperature|exhaust_vacuum|ambient_pressure|relative_humidity|unscaled_features          |features                                                                    |prediction        |
+------+-----------+--------------+----------------+-----------------+---------------------------+----------------------------------------------------------------------------+------------------+
|425.14|29.67      |71.98         |1005.16         |67.75            |[29.67,71.98,1005.16,67.75]|[3.9812286587104455,5.664196260503396,169.25351213184615,4.640325539814152] |430.7443133109189 |
|425.28|31.91      |67.83         |1008.76         |53.22            |[31.91,67.83,1008.76,53.22]|[4.281800016833512,5.337627568073706,169.85969686231158,3.6451383797624977] |429.8741206850499 |
|425.35|30.97      |67.69

In [32]:
prediction_df.select("label","prediction").show(truncate=False)

+------+------------------+
|label |prediction        |
+------+------------------+
|425.14|430.7443133109189 |
|425.28|429.8741206850499 |
|425.35|431.9526511577449 |
|425.35|427.5706479806339 |
|425.41|426.1607032876693 |
|425.48|430.25462677485615|
|425.58|428.54010704105724|
|425.75|430.102024408512  |
|425.91|431.59143011149484|
|426.13|433.51262571453935|
|426.15|430.1990197756726 |
|426.2 |432.77253149075966|
|426.48|431.87770680007907|
|426.5 |429.87729291336683|
|426.55|429.5845863737253 |
|426.6 |435.2817813141445 |
|426.64|432.6019648204355 |
|426.84|430.4946991690191 |
|426.87|432.86027478110344|
|426.9 |428.1503230827946 |
+------+------------------+
only showing top 20 rows



In [33]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

In [34]:
rmse = eval.evaluate(prediction_df)
print("RMSE: %.3f" % rmse)

RMSE: 4.318


In [35]:
mse = eval.evaluate(prediction_df, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

MSE: 18.649


In [36]:
mae = eval.evaluate(prediction_df, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

MAE: 3.490


In [None]:
r2 = eval.evaluate(prediction_df, {eval.metricName: "r2"})
print("r2: %.3f" %r2)