In [110]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression

In [111]:
spark = SparkSession.builder.appName('Exam1').getOrCreate()

In [112]:
housing_df = spark.read.csv("D:/housing.csv", inferSchema = True, header = True)

In [113]:
print(housing_df.count(), len(housing_df.columns))

506 13


This implies it has 506 rows and 13 columns

In [114]:
housing_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: string (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MEDVAL: double (nullable = true)



From the above schema we can see that column 'CHAS' has a string datatype i.e., categorical in nature.This needs to be conevrted into numerical form as MAchine mearning model cannot understand the categorical values, they only take data in numerical format. 
The conversion can be done using dummification ie., creating a dummy variable to make the feature regression-complaint.

In [115]:
from pyspark.sql.functions import corr

In [116]:
housing_df.select(corr('MEDVAL','CRIM')).show()

+--------------------+
|  corr(MEDVAL, CRIM)|
+--------------------+
|-0.38830460858681154|
+--------------------+



In [117]:
housing_df.select(corr('MEDVAL','ZN')).show()

+------------------+
|  corr(MEDVAL, ZN)|
+------------------+
|0.3604453424505433|
+------------------+



In [118]:
housing_df.select(corr('MEDVAL','INDUS')).show()

+-------------------+
|corr(MEDVAL, INDUS)|
+-------------------+
|-0.4837251600283728|
+-------------------+



In [119]:
housing_df.select(corr('MEDVAL','NOX')).show()

+-------------------+
|  corr(MEDVAL, NOX)|
+-------------------+
|-0.4273207723732821|
+-------------------+



In [120]:
housing_df.select(corr('MEDVAL','RM')).show()

+------------------+
|  corr(MEDVAL, RM)|
+------------------+
|0.6953599470715401|
+------------------+



In [121]:
housing_df.select(corr('MEDVAL','AGE')).show()

+-------------------+
|  corr(MEDVAL, AGE)|
+-------------------+
|-0.3769545650045961|
+-------------------+



In [122]:
housing_df.select(corr('MEDVAL','DIS')).show()

+-----------------+
|corr(MEDVAL, DIS)|
+-----------------+
|0.249928734085904|
+-----------------+



In [123]:
housing_df.select(corr('MEDVAL','RAD')).show()

+--------------------+
|   corr(MEDVAL, RAD)|
+--------------------+
|-0.38162623063977735|
+--------------------+



In [124]:
housing_df.select(corr('MEDVAL','TAX')).show()

+--------------------+
|   corr(MEDVAL, TAX)|
+--------------------+
|-0.46853593356776674|
+--------------------+



In [125]:
housing_df.select(corr('MEDVAL','PTRATIO')).show()

+---------------------+
|corr(MEDVAL, PTRATIO)|
+---------------------+
|  -0.5077866855375622|
+---------------------+



In [126]:
housing_df.select(corr('MEDVAL','LSTAT')).show()

+-------------------+
|corr(MEDVAL, LSTAT)|
+-------------------+
|-0.7376627261740145|
+-------------------+



In [127]:
CHAS_indexer = StringIndexer(inputCol = 'CHAS', outputCol = 'CHAS_num').fit(housing_df)
housing_df = CHAS_indexer.transform(housing_df) 

In [128]:
housing_df.select(corr('MEDVAL','CHAS_num')).show()

+----------------------+
|corr(MEDVAL, CHAS_num)|
+----------------------+
|    0.1752601771902987|
+----------------------+



In [129]:
housing_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: string (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MEDVAL: double (nullable = true)
 |-- CHAS_num: double (nullable = false)



In [130]:
housing_predict_df = housing_df.select('MEDVAL','LSTAT', 'PTRATIO','TAX', 'RM', 'INDUS')

In [131]:
housing_predict_df.printSchema()

root
 |-- MEDVAL: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- RM: double (nullable = true)
 |-- INDUS: double (nullable = true)



Correlation coefficient values must be closer to +1 or -1 which then indicates they have an impact on the output variable. 
The columns 'LSTAT', 'PTRATIO','TAX', 'RM', 'INDUS' are useful in predicting the median value as they have a mgnitude greater than 0.45. 
LSTAT has a negative coefficient meaning increase in Lstat will mark a decrease in MEDVAL column. They have a strong negative correaltion in between them.
PTRATIO, INDUS, TAX and MEDVAL have a weak negative correlation.
RM and MEDVAL have a strong positive correlation which means increase in RM will increase the MEDVAL. 

In [132]:
housing_predict_df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|            MEDVAL|             LSTAT|           PTRATIO|               TAX|                RM|             INDUS|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|               506|               506|
|   mean|22.532806324110698|12.653063241106723|18.455533596837967| 408.2371541501976| 6.284634387351787|11.136778656126504|
| stddev| 9.197104087379815| 7.141061511348571|2.1649455237144455|168.53711605495903|0.7026171434153232| 6.860352940897589|
|    min|               5.0|              1.73|              12.6|               187|             3.561|              0.46|
|    max|              50.0|             37.97|              22.0|               711|              8.78|             27.74|
+-------

In [133]:
DF_statistics = housing_predict_df.describe()
DF_statistics.show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|            MEDVAL|             LSTAT|           PTRATIO|               TAX|                RM|             INDUS|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|               506|               506|
|   mean|22.532806324110698|12.653063241106723|18.455533596837967| 408.2371541501976| 6.284634387351787|11.136778656126504|
| stddev| 9.197104087379815| 7.141061511348571|2.1649455237144455|168.53711605495903|0.7026171434153232| 6.860352940897589|
|    min|               5.0|              1.73|              12.6|               187|             3.561|              0.46|
|    max|              50.0|             37.97|              22.0|               711|              8.78|             27.74|
+-------

In [134]:
from pyspark.sql.functions import round, col

In [135]:
DF_statistics.select('summary', round(col('MEDVAL'),2)).show()

+-------+----------------+
|summary|round(MEDVAL, 2)|
+-------+----------------+
|  count|           506.0|
|   mean|           22.53|
| stddev|             9.2|
|    min|             5.0|
|    max|            50.0|
+-------+----------------+



In [136]:
DF_statistics.select('summary', round(col('LSTAT'),2)).show()

+-------+---------------+
|summary|round(LSTAT, 2)|
+-------+---------------+
|  count|          506.0|
|   mean|          12.65|
| stddev|           7.14|
|    min|           1.73|
|    max|          37.97|
+-------+---------------+



In [137]:
DF_statistics.select('summary', round(col('PTRATIO'),2)).show()

+-------+-----------------+
|summary|round(PTRATIO, 2)|
+-------+-----------------+
|  count|            506.0|
|   mean|            18.46|
| stddev|             2.16|
|    min|             12.6|
|    max|             22.0|
+-------+-----------------+



In [138]:
DF_statistics.select('summary', round(col('TAX'),2)).show()

+-------+-------------+
|summary|round(TAX, 2)|
+-------+-------------+
|  count|        506.0|
|   mean|       408.24|
| stddev|       168.54|
|    min|        187.0|
|    max|        711.0|
+-------+-------------+



In [139]:
DF_statistics.select('summary', round(col('RM'),2)).show()

+-------+------------+
|summary|round(RM, 2)|
+-------+------------+
|  count|       506.0|
|   mean|        6.28|
| stddev|         0.7|
|    min|        3.56|
|    max|        8.78|
+-------+------------+



In [140]:
DF_statistics.select('summary', round(col('INDUS'),2)).show()

+-------+---------------+
|summary|round(INDUS, 2)|
+-------+---------------+
|  count|          506.0|
|   mean|          11.14|
| stddev|           6.86|
|    min|           0.46|
|    max|          27.74|
+-------+---------------+



Rounded values for the descriptive statistics for the relevant attributes

In [141]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [142]:
vec_assemble = VectorAssembler(inputCols = ['LSTAT','PTRATIO','TAX','RM','INDUS'], outputCol = 'features')

In [143]:
housing_df_features = vec_assemble.transform(housing_predict_df)

In [144]:
housing_df_features.printSchema()

root
 |-- MEDVAL: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- RM: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- features: vector (nullable = true)



In [145]:
print(housing_df_features.count(),len(housing_df_features.columns))

506 7


In [146]:
housing_df_model = housing_df_features.select('MEDVAL', 'features')

In [147]:
print(housing_df_model.count(),len(housing_df_model.columns))

506 2


In [148]:
housing_df_model.printSchema()

root
 |-- MEDVAL: double (nullable = true)
 |-- features: vector (nullable = true)



In [149]:
housing_df_model.show(5,False)

+------+----------------------------+
|MEDVAL|features                    |
+------+----------------------------+
|24.0  |[4.98,15.3,296.0,6.575,2.31]|
|21.6  |[9.14,17.8,242.0,6.421,7.07]|
|34.7  |[4.03,17.8,242.0,7.185,7.07]|
|33.4  |[2.94,18.7,222.0,6.998,2.18]|
|36.2  |[5.33,18.7,222.0,7.147,2.18]|
+------+----------------------------+
only showing top 5 rows



Here the feature variable comtains all the input variables combined into a single vector variable.

### Training the regression model -- 80% of the data will be used to train the model and the remaining 20% to evaluate the model.

In [150]:
train_df, test_df = housing_df_model.randomSplit([0.80, 0.20])

In [151]:
print(train_df.count(), len(train_df.columns))

413 2


In [152]:
print(test_df.count(), len(test_df.columns))

93 2


In [153]:
train_df.show(5, False)

+------+------------------------------+
|MEDVAL|features                      |
+------+------------------------------+
|5.0   |[30.59,20.2,666.0,5.453,18.1] |
|6.3   |[29.97,20.2,666.0,5.852,18.1] |
|7.0   |[23.97,20.1,711.0,5.414,27.74]|
|7.0   |[36.98,20.2,666.0,4.519,18.1] |
|7.2   |[20.32,20.2,666.0,6.343,18.1] |
+------+------------------------------+
only showing top 5 rows



In [154]:
test_df.show(5, False)

+------+-----------------------------+
|MEDVAL|features                     |
+------+-----------------------------+
|5.0   |[22.98,20.2,666.0,5.683,18.1]|
|5.6   |[26.77,20.2,666.0,5.987,18.1]|
|10.4  |[26.64,20.2,666.0,5.304,18.1]|
|10.9  |[14.52,20.2,666.0,6.202,18.1]|
|10.9  |[21.08,20.2,666.0,6.545,18.1]|
+------+-----------------------------+
only showing top 5 rows



In [155]:
from pyspark.ml.regression import LinearRegression

In [156]:
lin_model = LinearRegression(labelCol = 'MEDVAL').fit(train_df)

In [157]:
print(lin_model.intercept)

23.58871278865642


In [158]:
print(lin_model.coefficients)

[-0.5921080681324046,-0.8773195845711795,-0.0026603001575036807,3.70572478743791,0.03370495247596223]


Coefficient of LSTAT     : -0.5935931763329011
Coefficient of PTRATIO   : -0.9244809589377402
Coefficient of TAX       : -0.0019753625410759835
Coefficient of RM        : 4.252680924351072
Coefficient of INDUS     : 0.03555874125405691

In the linear equation y = b0 + b1x1 + b2x2 + b3x3 + b4x4 +b5x5 , each of the x coefficients represent the above coefficient values.

In regression with multiple independent variables, the coefficients tells us how much the dependent variable(s) are expected to increase when that independent variable increases by one, holding all the other independent variables constant. 

So, when LSTAT increase by one, MEDVAL decreases by '0.5921080681324046' holding all other variables constants.
Similarly, for the positive coefficient 'RM'; when it increases by a unit, MEDVAL will increase by '4.252680924351072' times keeping all other variables constants.
And when the TAX increases by one, MEDVAL is supposed to decrease by '-0.0019753625410759835' keeping all other variables constant.
When INDUS increases by one, MEDVAL is supposed to be increased by '0.03555874125405691' keeping all other variables constant.

In [166]:
training_predictions = lin_model.evaluate(train_df)

In [167]:
print(training_predictions.r2)

0.6534480200897385


In [168]:
print(training_predictions.rootMeanSquaredError)

5.237472000352082


In [169]:
print(training_predictions.meanSquaredError)

27.43111295447204


R-Square value implies it has a better prediction power.


In [163]:
test_predictions = lin_model.evaluate(test_df)

In [164]:
print(test_predictions.r2)

0.7511148715928186


In [165]:
print(test_predictions.rootMeanSquaredError)

5.1490764501604716


In [170]:
print(test_predictions.meanSquaredError)

26.512988289597164


R-Square value implies it has better prediction power.
RMSE of test model is lesser/almost equal to the trained model which implies the model a good fit.