In [22]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Conduct_data_mining_linear_regression_training').getOrCreate()

df = spark.read.csv("../Bike-Sharing-Dataset-hour_lag1.csv",inferSchema=True,header=True)

In [23]:
import pandas as pd
new_df = df.select('reinstant', 'mnth', 'hr', 'weekday', 'holiday', 'workingday', 'hum', 
                   'temp', 'atemp', 'windspeed', 'weathersit', 'cnt_lag1')
new_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
reinstant,17544,8772.5,5064.66089684196,1,17544
mnth,17544,6.519835841313269,3.4496492053764083,1,12
hr,17544,11.5,6.922383841604305,0,23
weekday,17544,2.997264021887825,2.0034722846903734,0,6
holiday,17544,0.028499772001823985,0.1664004605297057,0,1
workingday,17544,0.6842225262197903,0.46483801154182497,0,1
hum,17544,0.6272285681714603,0.19202039360213635,0.0,1.0
temp,17544,0.49698634860921004,0.19164844257608665,0.02,1.0
atemp,17544,0.4757743958048299,0.17104014126878642,0.0,1.0


In [24]:
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(inputCols = ['reinstant', 'mnth', 'hr', 'weekday', 'holiday', 'workingday', 'hum', 
                                                'temp', 'atemp', 'windspeed', 'weathersit'], outputCol = 'features')
vector_output = vector_assembler.transform(new_df)
vector_output.printSchema()
vector_output.head(1)

root
 |-- reinstant: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- hum: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- cnt_lag1: integer (nullable = true)
 |-- features: vector (nullable = true)



[Row(reinstant=1, mnth=1, hr=0, weekday=6, holiday=0, workingday=0, hum=0.81, temp=0.24, atemp=0.2879, windspeed=0.0, weathersit=1, cnt_lag1=0, features=DenseVector([1.0, 1.0, 0.0, 6.0, 0.0, 0.0, 0.81, 0.24, 0.2879, 0.0, 1.0]))]

In [25]:
vector_output = vector_output.select(['features', 'cnt_lag1'])

print(vector_output.head(3))
pd.DataFrame(vector_output.take(10), columns=vector_output.columns)

[Row(features=DenseVector([1.0, 1.0, 0.0, 6.0, 0.0, 0.0, 0.81, 0.24, 0.2879, 0.0, 1.0]), cnt_lag1=0), Row(features=DenseVector([2.0, 1.0, 1.0, 6.0, 0.0, 0.0, 0.8, 0.22, 0.2727, 0.0, 1.0]), cnt_lag1=16), Row(features=DenseVector([3.0, 1.0, 2.0, 6.0, 0.0, 0.0, 0.8, 0.22, 0.2727, 0.0, 1.0]), cnt_lag1=40)]


Unnamed: 0,features,cnt_lag1
0,"[1.0, 1.0, 0.0, 6.0, 0.0, 0.0, 0.81, 0.24, 0.2...",0
1,"[2.0, 1.0, 1.0, 6.0, 0.0, 0.0, 0.8, 0.22, 0.27...",16
2,"[3.0, 1.0, 2.0, 6.0, 0.0, 0.0, 0.8, 0.22, 0.27...",40
3,"[4.0, 1.0, 3.0, 6.0, 0.0, 0.0, 0.75, 0.24, 0.2...",32
4,"[5.0, 1.0, 4.0, 6.0, 0.0, 0.0, 0.75, 0.24, 0.2...",13
5,"[6.0, 1.0, 5.0, 6.0, 0.0, 0.0, 0.75, 0.24, 0.2...",1
6,"[7.0, 1.0, 6.0, 6.0, 0.0, 0.0, 0.8, 0.22, 0.27...",1
7,"[8.0, 1.0, 7.0, 6.0, 0.0, 0.0, 0.86, 0.2, 0.25...",2
8,"[9.0, 1.0, 8.0, 6.0, 0.0, 0.0, 0.75, 0.24, 0.2...",3
9,"[10.0, 1.0, 9.0, 6.0, 0.0, 0.0, 0.76, 0.32, 0....",8


In [26]:
train_data,test_data = vector_output.randomSplit([0.8,0.2])

train_df = train_data.describe()
pd.DataFrame(train_df.take(5), columns=train_df.columns)

Unnamed: 0,summary,cnt_lag1
0,count,14031.0
1,mean,188.6397263202908
2,stddev,180.4763290091313
3,min,0.0
4,max,977.0


In [27]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='cnt_lag1')
lr_model = lr.fit(train_data)
print("Coefficients: ") 
x = 0
for coe in str(lr_model.coefficients)[1:].replace("]","").split(','):
    print("\t(cnt/"+new_df.columns[x]+"): "+coe)
    x += 1

print("Intercept: " + str(lr_model.intercept) + "\n")

data_summary = lr_model.summary
print("RMSE: " + str(data_summary.rootMeanSquaredError))
print("R2: " + str(data_summary.r2))
print("\n")

data_resi = data_summary.residuals
pd.DataFrame(data_resi.take(10), columns=data_resi.columns)

Coefficients: 
	(cnt/reinstant): 0.009167278917365063
	(cnt/mnth): -2.348981255586306
	(cnt/hr): 9.733779760067153
	(cnt/weekday): 0.9740387273275691
	(cnt/holiday): -23.180808076612724
	(cnt/workingday): 5.131264626150019
	(cnt/hum): -177.0526574582699
	(cnt/temp): 20.478778132184402
	(cnt/atemp): 307.36340126344084
	(cnt/windspeed): 29.390524518414953
	(cnt/weathersit): -2.0742990562689996
Intercept: -41.36144989986178

RMSE: 136.0088167803868
R2: 0.4320310667828279




Unnamed: 0,residuals
0,87.169238
1,84.48581
2,70.839813
3,87.595229
4,118.705269
5,140.246255
6,115.533236
7,-10.476663
8,39.426989
9,100.17978


In [28]:
test_df = test_data.describe()
pd.DataFrame(test_df.take(5), columns=test_df.columns)

Unnamed: 0,summary,cnt_lag1
0,count,3513.0
1,mean,192.6689439225733
2,stddev,180.7598080195601
3,min,1.0
4,max,963.0


In [29]:
lr = LinearRegression(featuresCol='features', labelCol='cnt_lag1')
lr_model = lr.fit(test_data)
print("Coefficients: ") 
x = 0
for coe in str(lr_model.coefficients)[1:].replace("]","").split(','):
    print("\t(cnt/"+new_df.columns[x]+"): "+coe)
    x += 1

print("Intercept: " + str(lr_model.intercept) + "\n")

data_summary = lr_model.summary
print("RMSE: " + str(data_summary.rootMeanSquaredError))
print("R2: " + str(data_summary.r2))
print("\n")

data_resi = data_summary.residuals
pd.DataFrame(data_resi.take(10), columns=data_resi.columns)

Coefficients: 
	(cnt/reinstant): 0.008778303878578973
	(cnt/mnth): -0.3410613475898231
	(cnt/hr): 10.159723990623938
	(cnt/weekday): 3.627954214619184
	(cnt/holiday): -26.112704714363034
	(cnt/workingday): -6.629542287461042
	(cnt/hum): -183.45057216074343
	(cnt/temp): 44.54450965497793
	(cnt/atemp): 269.25618820410733
	(cnt/windspeed): 0.05946874274210533
	(cnt/weathersit): -4.963131048758279
Intercept: -36.083100861788914

RMSE: 135.52602946190086
R2: 0.43770356570232116




Unnamed: 0,residuals
0,130.649313
1,74.829658
2,-28.423166
3,23.134279
4,-32.573321
5,-13.443748
6,-34.584977
7,-42.788768
8,103.682307
9,93.3005
