In [1]:
import os
os.environ["HADOOP_USER_NAME"] = "spark"
os.environ["SPARK_MAJOR_VERSION"] = "2"
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
import findspark
findspark.init()
import pyspark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

In [2]:
file_name = "/user/maria_dev/datasets/CCPP/data.csv"
sc = pyspark.SparkContext(appName="LR")

In [3]:
sqlContext = SQLContext(sc)

In [4]:
data = sqlContext.read.options(header='true', inferschema='true', delimiter=',').csv(file_name)
data.cache()

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [8]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|                AT|                 V|                AP|                RH|                PE|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|              9568|              9568|              9568|              9568|              9568|
|   mean| 19.65123118729102| 54.30580372073601|1013.2590781772603| 73.30897784280926| 454.3650094063554|
| stddev|7.4524732296110825|12.707892998326784| 5.938783705811581|14.600268756728964|17.066994999803402|
|    min|              1.81|             25.36|            992.89|             25.56|            420.26|
|    max|             37.11|             81.56|            1033.3|            100.16|            495.76|
+-------+------------------+------------------+------------------+------------------+------------------+



In [9]:
features = ["AT", "V", "AP", "RH"]
lr_data = data.select(col("PE").alias("label"), *features)
lr_data.printSchema()

root
 |-- label: double (nullable = true)
 |-- AT: double (nullable = true)
 |-- V: double (nullable = true)
 |-- AP: double (nullable = true)
 |-- RH: double (nullable = true)



In [10]:
(training, test) = lr_data.randomSplit([.7, .3])

In [11]:
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
lr = LinearRegression(maxIter=10, regParam=.01)

stages = [vectorAssembler, standardScaler, lr]
pipeline = Pipeline(stages=stages)

In [12]:
model = pipeline.fit(training)
prediction = model.transform(test)

In [13]:
prediction.show()

+------+-----+-----+-------+-----+--------------------+--------------------+------------------+
| label|   AT|    V|     AP|   RH|   unscaled_features|            features|        prediction|
+------+-----+-----+-------+-----+--------------------+--------------------+------------------+
|425.12|31.74|72.58|1007.26|59.58|[31.74,72.58,1007...|[4.25925684300654...| 428.0324809437086|
|425.14|29.67|71.98|1005.16|67.75|[29.67,71.98,1005...|[3.98147922281046...|430.82367997457794|
|425.21|32.19|69.13|1000.45|48.22|[32.19,69.13,1000...|[4.31964328217960...| 429.2244676866211|
|425.28|31.91|67.83|1008.76|53.22|[31.91,67.83,1008...|[4.28206949780525...| 429.9086812268348|
| 425.3| 30.9| 69.4|1003.53|66.69|[30.9,69.4,1003.5...|[4.14653548988349...|429.06578322910303|
|425.34|31.73|74.67|1016.38|44.51|[31.73,74.67,1016...|[4.25791492213603...| 430.5394185266408|
| 425.5| 29.0|69.13|1001.22|52.96|[29.0,69.13,1001....|[3.89157052448613...|434.80667035218136|
|425.55|29.27|69.89|1015.21|67.16|[29.27

In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(prediction)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(prediction, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(prediction, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(prediction, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.557
MSE: 20.768
MAE: 3.587
r2: 0.929


In [15]:
print(type(lr_data))
print(type(training))
print(type(test))
print(type(prediction))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
