In [1]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Python Linear Regression example").getOrCreate()

In [4]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [5]:
data = spark.read.load("linregdata1.csv", format="csv", sep=",", inferSchema="true", header="true")
data.printSchema()

root
 |-- temperature: double (nullable = true)
 |-- exhaust_vacuum: double (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- energy_output: double (nullable = true)



In [6]:
data.describe()

DataFrame[summary: string, temperature: string, exhaust_vacuum: string, ambient_pressure: string, relative_humidity: string, energy_output: string]

In [7]:
features = ["temperature", "exhaust_vacuum", "ambient_pressure", "relative_humidity"]

In [8]:
lr_data = data.select(col("energy_output").alias("label"), *features)
lr_data.printSchema()

root
 |-- label: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- exhaust_vacuum: double (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- relative_humidity: double (nullable = true)



In [9]:
lr_data.show()

+------+-----------+--------------+----------------+-----------------+
| label|temperature|exhaust_vacuum|ambient_pressure|relative_humidity|
+------+-----------+--------------+----------------+-----------------+
|480.48|       8.34|         40.77|         1010.84|            90.01|
|445.75|      23.64|         58.49|          1011.4|             74.2|
|438.76|      29.74|          56.9|         1007.15|            41.91|
|453.09|      19.07|         49.69|         1007.22|            76.79|
|464.43|       11.8|         40.66|         1017.13|             97.2|
|470.96|      13.97|         39.16|         1016.05|             84.6|
|442.35|       22.1|         71.29|          1008.2|            75.38|
| 464.0|      14.47|         41.76|         1021.98|            78.41|
|428.77|      31.25|         69.51|         1010.25|            36.83|
|484.31|       6.77|         38.18|          1017.8|            81.13|
|435.29|      28.28|         68.67|         1006.36|             69.9|
|451.4

VectorAssembler is a transformer that combines a given list of columns into a single vector column.

In [None]:
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")

In [None]:
va_data = vectorAssembler.transform(lr_data)

In [None]:
va_data.show(truncate=False)

StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation or zero mean.
Uses 'withStd' by default i.e. scales the data to unit standard deviation.

In [None]:
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

In [None]:
ss_model = standardScaler.fit(va_data)

In [None]:
ss_data = ss_model.transform(va_data)

In [None]:
ss_data.show(truncate=False)

In [None]:
(training, test) = ss_data.randomSplit([.7, .3])

In [None]:
training.describe().show()

In [None]:
test.describe().show()

In [None]:
lr = LinearRegression(maxIter=10, regParam=.01)

In [None]:
lr_model = lr.fit(training)

Now that the linear regression model is built we can apply it on the test data using transform method.
Before that we can look at the characteristics of our model i.e. coefficients and other parameters.

In [None]:
lr_model.coefficients

In [None]:
lr_model.intercept

In [None]:
trainingSummary = lr_model.summary

In [None]:
trainingSummary.rootMeanSquaredError

In [None]:
trainingSummary.meanAbsoluteError

In [None]:
trainingSummary.meanSquaredError

In [None]:
trainingSummary.r2

In [None]:
prediction_df = lr_model.transform(test)

In [None]:
prediction_df.show(truncate=False)

In [None]:
prediction_df.select("label","prediction").show(truncate=False)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

In [None]:
rmse = eval.evaluate(prediction_df)
print("RMSE: %.3f" % rmse)

In [None]:
mse = eval.evaluate(prediction_df, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

In [None]:
mae = eval.evaluate(prediction_df, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

In [None]:
r2 = eval.evaluate(prediction_df, {eval.metricName: "r2"})
print("r2: %.3f" %r2)