In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Regression').getOrCreate()

In [2]:
import pandas as pd
import numpy as np
from sklearn import datasets, linear_model
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
from scipy import stats

In [8]:
#import dataset from file
f = open('housing.txt', 'r')
rows = []

#read data in rows array
for line in f:
    # Split on any whitespace (including tab characters)
    row = line.split()
    # Convert strings to numeric values:
    for i in range(14):
        row[i]=float(row[i])
    # Append to our list of lists:
    rows.append(row)
                
# Construct data independent variables X
X= np.delete(rows, 13, 1)
print("independent variables : ")
print (X)
y = [row[13] for row in rows]
print("data precise result : ")
print(y)

independent variables : 
[[6.3200e-03 1.8000e+01 2.3100e+00 ... 1.5300e+01 3.9690e+02 4.9800e+00]
 [2.7310e-02 0.0000e+00 7.0700e+00 ... 1.7800e+01 3.9690e+02 9.1400e+00]
 [2.7290e-02 0.0000e+00 7.0700e+00 ... 1.7800e+01 3.9283e+02 4.0300e+00]
 ...
 [6.0760e-02 0.0000e+00 1.1930e+01 ... 2.1000e+01 3.9690e+02 5.6400e+00]
 [1.0959e-01 0.0000e+00 1.1930e+01 ... 2.1000e+01 3.9345e+02 6.4800e+00]
 [4.7410e-02 0.0000e+00 1.1930e+01 ... 2.1000e+01 3.9690e+02 7.8800e+00]]
data precise result : 
[24.0, 21.6, 34.7, 33.4, 36.2, 28.7, 22.9, 27.1, 16.5, 18.9, 15.0, 18.9, 21.7, 20.4, 18.2, 19.9, 23.1, 17.5, 20.2, 18.2, 13.6, 19.6, 15.2, 14.5, 15.6, 13.9, 16.6, 14.8, 18.4, 21.0, 12.7, 14.5, 13.2, 13.1, 13.5, 18.9, 20.0, 21.0, 24.7, 30.8, 34.9, 26.6, 25.3, 24.7, 21.2, 19.3, 20.0, 16.6, 14.4, 19.4, 19.7, 20.5, 25.0, 23.4, 18.9, 35.4, 24.7, 31.6, 23.3, 19.6, 18.7, 16.0, 22.2, 25.0, 33.0, 23.5, 19.4, 22.0, 17.4, 20.9, 24.2, 21.7, 22.8, 23.4, 24.1, 21.4, 20.0, 20.8, 21.2, 20.3, 28.0, 23.9, 24.8, 22.9, 23.

In [9]:
# variables significatifs
X2 = sm.add_constant(X)
est = sm.OLS(y, X2)
est2 = est.fit()
print(est2.summary())

                            OLS Regression Results                            
Dep. Variable:                      y   R-squared:                       0.741
Model:                            OLS   Adj. R-squared:                  0.734
Method:                 Least Squares   F-statistic:                     108.1
Date:                Fri, 05 Jun 2020   Prob (F-statistic):          6.72e-135
Time:                        14:48:47   Log-Likelihood:                -1498.8
No. Observations:                 506   AIC:                             3026.
Df Residuals:                     492   BIC:                             3085.
Df Model:                          13                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const         36.4595      5.103      7.144      0.0

In [11]:
# delete no-significative-variable X7 -- P = 0,958 > 0,05
X= np.delete(X, 6, 1)
X2 = sm.add_constant(X)
est = sm.OLS(y, X2)
est2 = est.fit()
print(est2.summary())

                            OLS Regression Results                            
Dep. Variable:                      y   R-squared:                       0.741
Model:                            OLS   Adj. R-squared:                  0.734
Method:                 Least Squares   F-statistic:                     117.3
Date:                Fri, 05 Jun 2020   Prob (F-statistic):          6.08e-136
Time:                        14:53:09   Log-Likelihood:                -1498.8
No. Observations:                 506   AIC:                             3024.
Df Residuals:                     493   BIC:                             3079.
Df Model:                          12                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const         36.4369      5.080      7.172      0.0

In [12]:
# delete no-significative-variable X3 -- P = 0.738 > 0,05
X= np.delete(X, 2, 1)
X2 = sm.add_constant(X)
est = sm.OLS(y, X2)
est2 = est.fit()
print(est2.summary())

                            OLS Regression Results                            
Dep. Variable:                      y   R-squared:                       0.741
Model:                            OLS   Adj. R-squared:                  0.735
Method:                 Least Squares   F-statistic:                     128.2
Date:                Fri, 05 Jun 2020   Prob (F-statistic):          5.54e-137
Time:                        14:54:50   Log-Likelihood:                -1498.9
No. Observations:                 506   AIC:                             3022.
Df Residuals:                     494   BIC:                             3072.
Df Model:                          11                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const         36.3411      5.067      7.171      0.0

In [14]:
result = np.column_stack([X, y])
print(result)
# save our clean dataframe
import numpy as np1
a = np1.asarray(result)
np1.savetxt("housing_data_cleaned.csv", a, delimiter=",")

[[6.3200e-03 1.8000e+01 0.0000e+00 ... 3.9690e+02 4.9800e+00 2.4000e+01]
 [2.7310e-02 0.0000e+00 0.0000e+00 ... 3.9690e+02 9.1400e+00 2.1600e+01]
 [2.7290e-02 0.0000e+00 0.0000e+00 ... 3.9283e+02 4.0300e+00 3.4700e+01]
 ...
 [6.0760e-02 0.0000e+00 0.0000e+00 ... 3.9690e+02 5.6400e+00 2.3900e+01]
 [1.0959e-01 0.0000e+00 0.0000e+00 ... 3.9345e+02 6.4800e+00 2.2000e+01]
 [4.7410e-02 0.0000e+00 0.0000e+00 ... 3.9690e+02 7.8800e+00 1.1900e+01]]


In [15]:
#load our clean dataset
data = spark.read.csv(
    'housing_data_cleaned.csv', inferSchema=True, header=False)
data.show(10)

+-------+----+---+-----+-----+------+---+-----+----+------+-----+----+
|    _c0| _c1|_c2|  _c3|  _c4|   _c5|_c6|  _c7| _c8|   _c9| _c10|_c11|
+-------+----+---+-----+-----+------+---+-----+----+------+-----+----+
|0.00632|18.0|0.0|0.538|6.575|  4.09|1.0|296.0|15.3| 396.9| 4.98|24.0|
|0.02731| 0.0|0.0|0.469|6.421|4.9671|2.0|242.0|17.8| 396.9| 9.14|21.6|
|0.02729| 0.0|0.0|0.469|7.185|4.9671|2.0|242.0|17.8|392.83| 4.03|34.7|
|0.03237| 0.0|0.0|0.458|6.998|6.0622|3.0|222.0|18.7|394.63| 2.94|33.4|
|0.06905| 0.0|0.0|0.458|7.147|6.0622|3.0|222.0|18.7| 396.9| 5.33|36.2|
|0.02985| 0.0|0.0|0.458| 6.43|6.0622|3.0|222.0|18.7|394.12| 5.21|28.7|
|0.08829|12.5|0.0|0.524|6.012|5.5605|5.0|311.0|15.2| 395.6|12.43|22.9|
|0.14455|12.5|0.0|0.524|6.172|5.9505|5.0|311.0|15.2| 396.9|19.15|27.1|
|0.21124|12.5|0.0|0.524|5.631|6.0821|5.0|311.0|15.2|386.63|29.93|16.5|
|0.17004|12.5|0.0|0.524|6.004|6.5921|5.0|311.0|15.2|386.71| 17.1|18.9|
+-------+----+---+-----+-----+------+---+-----+----+------+-----+----+
only s

In [16]:
# group variables
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["_c0","_c1","_c2","_c3","_c4","_c5","_c6","_c7","_c8","_c9","_c10"],outputCol="Independent Features")
output=featureassembler.transform(data)
output.show()

+-------+----+---+-----+-----+------+---+-----+----+------+-----+----+--------------------+
|    _c0| _c1|_c2|  _c3|  _c4|   _c5|_c6|  _c7| _c8|   _c9| _c10|_c11|Independent Features|
+-------+----+---+-----+-----+------+---+-----+----+------+-----+----+--------------------+
|0.00632|18.0|0.0|0.538|6.575|  4.09|1.0|296.0|15.3| 396.9| 4.98|24.0|[0.00632,18.0,0.0...|
|0.02731| 0.0|0.0|0.469|6.421|4.9671|2.0|242.0|17.8| 396.9| 9.14|21.6|[0.02731,0.0,0.0,...|
|0.02729| 0.0|0.0|0.469|7.185|4.9671|2.0|242.0|17.8|392.83| 4.03|34.7|[0.02729,0.0,0.0,...|
|0.03237| 0.0|0.0|0.458|6.998|6.0622|3.0|222.0|18.7|394.63| 2.94|33.4|[0.03237,0.0,0.0,...|
|0.06905| 0.0|0.0|0.458|7.147|6.0622|3.0|222.0|18.7| 396.9| 5.33|36.2|[0.06905,0.0,0.0,...|
|0.02985| 0.0|0.0|0.458| 6.43|6.0622|3.0|222.0|18.7|394.12| 5.21|28.7|[0.02985,0.0,0.0,...|
|0.08829|12.5|0.0|0.524|6.012|5.5605|5.0|311.0|15.2| 395.6|12.43|22.9|[0.08829,12.5,0.0...|
|0.14455|12.5|0.0|0.524|6.172|5.9505|5.0|311.0|15.2| 396.9|19.15|27.1|[0.14455,1

In [17]:
# get our variables and our collected results
finalized_data=output.select("Independent Features","_c11")
finalized_data.show()

+--------------------+----+
|Independent Features|_c11|
+--------------------+----+
|[0.00632,18.0,0.0...|24.0|
|[0.02731,0.0,0.0,...|21.6|
|[0.02729,0.0,0.0,...|34.7|
|[0.03237,0.0,0.0,...|33.4|
|[0.06905,0.0,0.0,...|36.2|
|[0.02985,0.0,0.0,...|28.7|
|[0.08829,12.5,0.0...|22.9|
|[0.14455,12.5,0.0...|27.1|
|[0.21124,12.5,0.0...|16.5|
|[0.17004,12.5,0.0...|18.9|
|[0.22489,12.5,0.0...|15.0|
|[0.11747,12.5,0.0...|18.9|
|[0.09378,12.5,0.0...|21.7|
|[0.62976,0.0,0.0,...|20.4|
|[0.63796,0.0,0.0,...|18.2|
|[0.62739,0.0,0.0,...|19.9|
|[1.05393,0.0,0.0,...|23.1|
|[0.7842,0.0,0.0,0...|17.5|
|[0.80271,0.0,0.0,...|20.2|
|[0.7258,0.0,0.0,0...|18.2|
+--------------------+----+
only showing top 20 rows



In [18]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finalized_data.randomSplit([0.8,0.2])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='_c11')
regressor=regressor.fit(train_data)
regressor.coefficients

DenseVector([-0.1042, 0.0466, 2.3501, -17.3043, 3.9503, -1.5758, 0.2947, -0.0128, -0.9299, 0.0095, -0.5407])

In [19]:
regressor.intercept

36.05762507361847

In [23]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show(100)

+--------------------+----+------------------+
|Independent Features|_c11|        prediction|
+--------------------+----+------------------+
|[0.01096,55.0,0.0...|22.0|27.400220157086036|
|[0.01778,95.0,0.0...|32.9|30.822278282770107|
|[0.02187,60.0,0.0...|31.1| 32.39470360427489|
|[0.02899,40.0,0.0...|26.6|22.267307453962978|
|[0.03466,35.0,0.0...|19.4|23.371369668538485|
|[0.03502,80.0,0.0...|28.5| 33.87624616917972|
|[0.03551,25.0,0.0...|22.9|25.269605362499576|
|[0.03584,80.0,0.0...|23.5|30.473506928092192|
|[0.03738,0.0,0.0,...|20.7| 21.76758904342919|
|[0.03961,0.0,0.0,...|21.1|20.824310348261815|
|[0.0456,0.0,1.0,0...|23.3|26.363938217279728|
|[0.04684,0.0,0.0,...|22.6|27.616222081530346|
|[0.04741,0.0,0.0,...|11.9|22.798130000928438|
|[0.05059,0.0,0.0,...|23.9| 25.08321192689147|
|[0.0578,0.0,0.0,0...|37.2| 33.63254918509893|
|[0.05789,12.5,0.0...|22.0|21.117471409580535|
|[0.06263,0.0,0.0,...|22.4|24.047703557274346|
|[0.06417,0.0,0.0,...|18.9|  24.1487955883097|
|[0.06664,0.0

In [25]:
# Compare pyspark results with sklearn results
from sklearn.linear_model import LinearRegression
# Fit a linear regression model to the data
model = LinearRegression()
model.fit(X, y)
print(model)

# Make predictions
expected = y
predicted = model.predict(X)
result1 = np.column_stack([expected,predicted])
print(result1)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None, normalize=False)
[[24.         30.12428141]
 [21.6        24.99652756]
 [34.7        30.53337038]
 ...
 [23.9        27.6099988 ]
 [22.         26.11133299]
 [11.9        22.34084267]]


In [27]:
# summarize the fit of the model
mse = np.mean((predicted-expected)**2)
print("intercept:")
print (model.intercept_)
print("Coefficients:")
print (model.coef_)
print("MSE:")
print (mse)
print ("score: ")
print(model.score(X, y))

intercept:
36.34114500447117
Coefficients:
[-1.08413345e-01  4.58449292e-02  2.71871630e+00 -1.73760234e+01
  3.80157884e+00 -1.49271146e+00  2.99608454e-01 -1.17779735e-02
 -9.46524570e-01  9.29084477e-03 -5.22553457e-01]
MSE:
21.899928759752196
score: 
0.7405822802569575
