### pyspark -Linear Regression model for House Price prediction

In [10]:
#set environment
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [11]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Classification of Car Dataset") \
    .getOrCreate()

The preprocecced data in sklearn was submitted to a csv file and that is imported here.

In [12]:
df = spark.read.csv('new_regression2_try.csv',inferSchema=True,header=True)
df.show(3)

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|
|     61287.06718|         5.86588984|               8.51272743|                        5.13|     36882.1594|1058987.988|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
only showing top 3 rows



In [13]:
#SQL like schema
df.printSchema()

root
 |-- Avg. Area Income: double (nullable = true)
 |-- Avg. Area House Age: double (nullable = true)
 |-- Avg. Area Number of Rooms: double (nullable = true)
 |-- Avg. Area Number of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)



In [21]:
#renaming the columns
df = df.toDF('Avg_Income', 'Avg_House_Age', 'Avg_No_of_Rooms', 'Avg_No_of_Bedrooms', 'Population', 'price')

In [22]:
#Check for missing values
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column Avg_Income with null values: 0
no. of cells in column Avg_House_Age with null values: 0
no. of cells in column Avg_No_of_Rooms with null values: 0
no. of cells in column Avg_No_of_Bedrooms with null values: 0
no. of cells in column Population with null values: 0
no. of cells in column price with null values: 0


In [31]:
#all the independent variables need to be packed into one column of vector type
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Avg_Income","Avg_House_Age","Avg_No_of_Rooms","Avg_No_of_Bedrooms","Population"], 
                            outputCol="features")
feature_vec=assembler.transform(df).select('features','Price')
feature_vec.show(5)
df = feature_vec.select(['features','Price'])
df.show(3)

+--------------------+-----------+
|            features|      Price|
+--------------------+-----------+
|[79545.45857,5.68...|1059033.558|
|[79248.64245,6.00...|1505890.915|
|[61287.06718,5.86...|1058987.988|
|[63345.24005,7.18...|1260616.807|
|[59982.19723,5.04...|630943.4893|
+--------------------+-----------+
only showing top 5 rows

+--------------------+-----------+
|            features|      Price|
+--------------------+-----------+
|[79545.45857,5.68...|1059033.558|
|[79248.64245,6.00...|1505890.915|
|[61287.06718,5.86...|1058987.988|
+--------------------+-----------+
only showing top 3 rows



In [32]:
#Standard scaling for data
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [33]:
scaler_model = scaler.fit(feature_vec)

In [34]:
scaled_data = scaler_model.transform(feature_vec)

In [35]:
#import PCA 
from pyspark.ml.feature import PCA
pca = PCA(k=5, inputCol='features', outputCol='features_pca')

In [36]:
#Pca model on feature vector
pca_model = pca.fit(feature_vec)

In [37]:
pca_data = pca_model.transform(feature_vec).select('features_pca')
pca_data.show(3)

+--------------------+
|        features_pca|
+--------------------+
|[-76466.360363686...|
|[-74261.531550111...|
|[-56780.370568401...|
+--------------------+
only showing top 3 rows



In [38]:
#concatenating label column to feature_pca columns
from pyspark.sql.functions import concat
from pyspark.sql.functions import col
res = df.withColumn("Price",concat(col("Price")))
res.show(5)

+--------------------+-----------+
|            features|      Price|
+--------------------+-----------+
|[79545.45857,5.68...|1059033.558|
|[79248.64245,6.00...|1505890.915|
|[61287.06718,5.86...|1058987.988|
|[63345.24005,7.18...|1260616.807|
|[59982.19723,5.04...|630943.4893|
+--------------------+-----------+
only showing top 5 rows



In [39]:
#
from pyspark.sql.functions import monotonically_increasing_id 
df = df.withColumn("row_id", monotonically_increasing_id())
pca_data = pca_data.withColumn("row_id", monotonically_increasing_id())
result_df = df.join(pca_data, ("row_id")).drop("row_id")
result_df.show()

+--------------------+-----------+--------------------+
|            features|      Price|        features_pca|
+--------------------+-----------+--------------------+
|[79545.45857,5.68...|1059033.558|[-76466.360363686...|
|[79248.64245,6.00...|1505890.915|[-74261.531550111...|
|[61287.06718,5.86...|1058987.988|[-56780.370568401...|
|[63345.24005,7.18...|1260616.807|[-59113.129456245...|
|[59982.19723,5.04...|630943.4893|[-56660.484529818...|
|[80175.75416,4.98...|1068138.074|[-76683.415807619...|
|[64698.46343,6.02...|1502055.817|[-57493.737275412...|
|[78394.33928,6.98...|1573936.564|[-73821.322982044...|
|[59927.66081,5.36...|798869.5328|[-56267.234551529...|
|[81885.92718,4.42...|1545154.813|[-76884.871691391...|
|[80527.47208,8.09...|1707045.722|[-74744.166707544...|
|[50593.6955,4.496...|663732.3969|[-46437.724134040...|
|[39033.80924,7.67...|1042814.098|[-34405.209757966...|
|[73163.66344,6.91...|1291331.518|[-69091.803888298...|
|[69391.38018,5.34...| 1402818.21|[-64986.010383

In [40]:
#dropping insignificant features
result_df = result_df.drop("features")
result_df.show(5)

+-----------+--------------------+
|      Price|        features_pca|
+-----------+--------------------+
|1059033.558|[-76466.360363686...|
|1505890.915|[-74261.531550111...|
|1058987.988|[-56780.370568401...|
|1260616.807|[-59113.129456245...|
|630943.4893|[-56660.484529818...|
+-----------+--------------------+
only showing top 5 rows



In [42]:
# Split the data into train and test sets
train_data, test_data = result_df.randomSplit([.75,.25],seed=0)
train_data.show(3)

+-----------+--------------------+
|      Price|        features_pca|
+-----------+--------------------+
|31140.51762|[-34014.637469225...|
|88591.77016|[-57911.475322092...|
|151527.0826|[-46230.447778747...|
+-----------+--------------------+
only showing top 3 rows



In [43]:
#import linear regression model
from pyspark.ml.regression import LinearRegression
# Create initial LinearRegression model
lr = LinearRegression(labelCol="Price", featuresCol="features_pca",  
                        maxIter=10, regParam=0.0001,  
                        elasticNetParam=0.0)
#fit linear regression model on training data
lrModel = lr.fit(train_data)
predictions = lrModel.transform(test_data)
predictions.printSchema()
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

root
 |-- Price: double (nullable = true)
 |-- features_pca: vector (nullable = true)
 |-- prediction: double (nullable = true)

Coefficients: [-19.626514833787713,17.214897309177925,-65817.10532679415,-161436.90740462617,107241.46474226534]
Intercept: -2628693.7946493393


In [7]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

In [8]:
splits = feature_vec.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [21.55934064044135,165622.22550399075,120450.23504599131,3116.88534896747,15.264745652303285]
Intercept: -2644194.4727492738


In [44]:
#Accuracy Metrics R2 & RMSE
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 101313.892997
r2: 0.918205


### INTERPRETATION

R2Score is 0.91 which shows our model is a very good fit for the data it is a decent enough model to predict the independ variable

This score is very similar to the score that we got in Skleran Linear Regression.