In [72]:
#importing pyspark and starting a spark session
import findspark
findspark.init('/usr/local/spark')
import pyspark
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Regression in spark').getOrCreate()
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler,StandardScaler
from pyspark.sql.types import StructType,StructField,FloatType

In [73]:
#Loading the csv data into the Dataframe
df = spark.read.csv('/home/siva/datasets/CCPP/data.csv',inferSchema = True)
#See how the schema looks like
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [74]:

df_structure = StructType([StructField("AT",FloatType(),True),
                          StructField("V",FloatType(),True),
                          StructField("AP",FloatType(),True),
                          StructField("RH",FloatType(),True),
                          StructField("PE",FloatType(),True)])

df = spark.read.csv('/home/siva/datasets/CCPP/data.csv',schema = df_structure)
df.select([count(when(isnan(c) | col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

+---+---+---+---+---+
| AT|  V| AP| RH| PE|
+---+---+---+---+---+
|  6|  6|  6|  6|  6|
+---+---+---+---+---+



In [75]:
from pyspark.ml.feature import Imputer

impute_function = Imputer(inputCols = ['AT','V','AP','RH','PE'],outputCols =['T','V2','P','H','E'] ).setStrategy("median")
model = impute_function.fit(df)
imputed_df = model.transform(df)

In [76]:
imputed_df.select([count(when(col(c).isNull(),1)).alias(c) for c in imputed_df.columns]).show()

+---+---+---+---+---+---+---+---+---+---+
| AT|  V| AP| RH| PE|  T| V2|  P|  H|  E|
+---+---+---+---+---+---+---+---+---+---+
|  6|  6|  6|  6|  6|  0|  0|  0|  0|  0|
+---+---+---+---+---+---+---+---+---+---+



In [77]:
imputed_df = imputed_df[['T','V2','P','H','E']]

In [78]:
assembler = VectorAssembler(inputCols = ['T','V2','P','H'],outputCol = 'features')
data= assembler.transform(imputed_df)

In [79]:
data_lr = data.select('features','E')
data_lr.show()
#Scaler =
#processed_data = Scaler.transform(imputed_df)

+--------------------+------+
|            features|     E|
+--------------------+------+
|[8.34000015258789...|480.48|
|[23.6399993896484...|445.75|
|[29.7399997711181...|438.76|
|[19.0699996948242...|453.09|
|[11.8000001907348...|464.43|
|[13.9700002670288...|470.96|
|[22.1000003814697...|442.35|
|[14.4700002670288...| 464.0|
|[31.25,69.5100021...|428.77|
|[6.76999998092651...|484.31|
|[28.2800006866455...|435.29|
|[22.9899997711181...|451.41|
|[29.2999992370605...|426.25|
|[20.3199996948242...|451.49|
|[16.9200000762939...|460.17|
|[22.7199993133544...|453.13|
|[18.1399993896484...|461.71|
|[11.4899997711181...|471.08|
|[20.3199996948242...|451.49|
|[23.5400009155273...|448.56|
+--------------------+------+
only showing top 20 rows



In [80]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True).fit(data_lr)
scaled_data = scaler.transform(data_lr)
mean,std= data_lr.select(mean('E'),stddev('E')).first()

In [81]:
scaled_data = scaled_data.withColumn("Scaled_label",(col('E')-mean)/std)
scaled_data = scaled_data.select('scaledFeatures','Scaled_label')

In [82]:
scaled_data


DataFrame[scaledFeatures: vector, Scaled_label: double]

In [83]:
train_data,test_data = scaled_data.randomSplit([0.7,0.3])
train_data.show()

+--------------------+------------------+
|      scaledFeatures|      Scaled_label|
+--------------------+------------------+
|[-2.3955148828863...| 2.120300532079899|
|[-2.3243655249099...| 2.107994980121641|
|[-2.2921469446936...|2.0113049237741394|
|[-2.2626132434948...|1.6579454822708364|
|[-2.2357644213134...|1.8067906203376582|
|[-2.2075731596234...|2.2597694153992793|
|[-2.2008609620796...| 2.051739239974724|
|[-2.1941487645359...| 1.923404547396823|
|[-2.1847516559683...| 2.024783625286913|
|[-2.1847516559683...|2.0359160276906563|
|[-2.1820667769508...|0.3218586883329826|
|[-2.1672999423546...|2.0335733052570975|
|[-2.1552179867758...|2.0282994972749866|
|[-2.1511906362433...|  1.96325228882015|
|[-2.1444784386995...|2.0892424706267567|
|[-2.1377662411558...| 2.004857966237514|
|[-2.1364238016470...|2.1173712348690827|
|[-2.1256842855770...|2.0318153692630605|
|[-2.1216569670507...|2.0751789826744615|
|[-2.1136022979919...|  2.01013356255736|
+--------------------+------------

In [84]:
lr = LinearRegression( featuresCol="scaledFeatures", labelCol="Scaled_label",standardization = False)


In [85]:
lr_model = lr.fit(train_data)

In [86]:
predictions_test = lr_model.evaluate(test_data)

In [87]:
predictions_test.rootMeanSquaredError

0.26328647841061426

In [88]:
predictions_test.r2

0.9297322185902106

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f44639263c8>