In [34]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('RegressionWithPySpark').getOrCreate()

In [35]:
import pandas as pd
data = pd.read_csv('data.csv')
print(data.isna().sum())

CRIM       0
ZN         0
INDUS      0
CHAS       0
NOX        0
RM         5
AGE        0
DIS        0
RAD        0
TAX        0
PTRATIO    0
B          0
LSTAT      0
MEDV       0
dtype: int64


**Explore and Defining Dataset**

In [36]:
# import dataset
df=spark.read.format('csv') \
.option('inferSchema','true') \
.option('header','true') \
.option('sep',',') \
.load('data.csv')

display(df)

DataFrame[CRIM: double, ZN: double, INDUS: double, CHAS: int, NOX: double, RM: double, AGE: double, DIS: double, RAD: int, TAX: int, PTRATIO: double, B: double, LSTAT: double, MEDV: double]

In [37]:
df.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'MEDV']

CRIM per capita crime rate by town

ZN proportion of residential land zoned for lots over
25,000 sq.ft.

INDUS proportion of non-retail business acres per town
CHAS Charles River dummy variable (= 1 if tract bounds
river; 0 otherwise)

NOX nitric oxides concentration (parts per 10 million)

RM average number of rooms per dwelling

AGE proportion of owner-occupied units built prior to 1940

DIS weighted distances to five Boston employment centres

RAD index of accessibility to radial highways

TAX full-value property-tax rate per $10,000

PTRATIO pupil-teacher ratio by town

B 1000(Bk - 0.63)^2 where Bk is the proportion of blacks
by town

LSTAT % lower status of the population

MEDV Median value of owner-occupied homes in $1000's

In [38]:
print(df.count(),len(df.columns))

511 14


In [39]:
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|summary|             CRIM|                ZN|            INDUS|               CHAS|                NOX|                RM|               AGE|               DIS|              RAD|               TAX|           PTRATIO|                B|             LSTAT|             MEDV|
+-------+-----------------+------------------+-----------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|  count|              511|               511|              511|                511|                511|               506|               511|               511|              511|  

Make our target an output label

In [40]:
df=df.withColumnRenamed('MEDV','label')
df.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|label|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+-----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|

In [41]:
from pyspark.sql.functions import corr

#shows correlation
df.select(corr('RM','label')).show()

+------------------+
|   corr(RM, label)|
+------------------+
|0.6676949056434682|
+------------------+



**Preparing Data with Feature Vector**

In [42]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

# we are turning all of our features into a single vector
vec_assembler = VectorAssembler(inputCols=['CRIM','ZN','INDUS', \
                'CHAS','NOX','AGE','DIS','RAD','TAX',
                'PTRATIO','B','LSTAT'],outputCol='features') 

# add it to the dataframe
features_df = vec_assembler.transform(df)  
#check if its added  
features_df.printSchema()                                                               

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [43]:
#check if its empty
features_df.select('features').show(5,False) 

+-------------------------------------------------------------------+
|features                                                           |
+-------------------------------------------------------------------+
|[0.00632,18.0,2.31,0.0,0.538,65.2,4.09,1.0,296.0,15.3,396.9,4.98]  |
|[0.02731,0.0,7.07,0.0,0.469,78.9,4.9671,2.0,242.0,17.8,396.9,9.14] |
|[0.02729,0.0,7.07,0.0,0.469,61.1,4.9671,2.0,242.0,17.8,392.83,4.03]|
|[0.03237,0.0,2.18,0.0,0.458,45.8,6.0622,3.0,222.0,18.7,394.63,2.94]|
|[0.06905,0.0,2.18,0.0,0.458,54.2,6.0622,3.0,222.0,18.7,396.9,5.33] |
+-------------------------------------------------------------------+
only showing top 5 rows



**Building the Linear Regression Model**

In [44]:
#pipeline
model_df = features_df.select('features','label') 
model_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
|[0.03237,0.0,2.18...| 33.4|
|[0.06905,0.0,2.18...| 36.2|
+--------------------+-----+
only showing top 5 rows



In [47]:
#split data into train and test dataframe
train_df , test_df=model_df.randomSplit([0.8,0.2])

train_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.00906,90.0,2.9...| 32.2|
|[0.01096,55.0,2.2...| 22.0|
|[0.01301,35.0,1.5...| 32.7|
|[0.0136,75.0,4.0,...| 18.9|
|[0.01381,80.0,0.4...| 50.0|
+--------------------+-----+
only showing top 5 rows



In [46]:
from pyspark.ml.regression import LinearRegression

lin_Reg = LinearRegression(labelCol='label')
lr_model = lin_Reg.fit(train_df)
print(lr_model.coefficients)

23/05/07 17:42:01 WARN Instrumentation: [50585841] regParam is zero, which might cause numerical instability and overfitting.
23/05/07 17:42:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/07 17:42:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/05/07 17:42:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


[-0.1725921392653662,0.07907607038823869,-0.1590591915346227,5.040943152814686,-20.803424017326016,-0.04814025976036571,-2.298479856806332,0.3150624883418397,-0.015040121976451997,-0.9068985368112062,0.007420014142907691,-0.27286816545861187]
