<a href="https://colab.research.google.com/github/asudebaykal/Machine-Learning-Projects/blob/main/HousePricePrediction_wit_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install dependencies to run Pyspark

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

Set Spark folder to the system path environment

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

Run a local spark session to test the environment


In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# House Price Prediction
Dataset: Boston Housing dataset

Model: Linear regression

In [7]:
from pyspark.ml.feature import VectorAssembler # combine all the features into one vector
from pyspark.ml.regression import LinearRegression
dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)

In [9]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [11]:
# Combine all features in to one vector
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox',
                                       'rm', 'age', 'dis', 'rad', 'tax',
                                       'ptratio', 'b', 'lstat'], outputCol = 'Features')
output = assembler.transform(dataset)

In [17]:
data = output.select('Features', 'medv')
# Features are the input features and medv is the target variables
data.show()

+--------------------+----+
|            Features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



In [18]:
#Split training (.8) and testing data (.2)
train_data,test_data = data.randomSplit([0.8,0.2])

# Model

In [19]:
model = LinearRegression(featuresCol = 'Features', labelCol = 'medv')

#Learn to fit the model from training set
model = model.fit(train_data)

#To predict the prices on testing set
pred = model.evaluate(test_data)
#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|            Features|medv|        prediction|
+--------------------+----+------------------+
|[0.00906,90.0,2.9...|32.2|  31.3468028766858|
|[0.01096,55.0,2.2...|22.0| 27.27703041974309|
|[0.01381,80.0,0.4...|50.0|40.412516980480916|
|[0.01951,17.5,1.3...|33.0|22.883394296139247|
|[0.02009,95.0,2.6...|50.0| 42.66094129017251|
|[0.02498,0.0,1.89...|16.5|22.340249495848536|
|[0.02899,40.0,1.2...|26.6|22.104875572344426|
|[0.0315,95.0,1.47...|34.9| 30.09442618561892|
|[0.03306,0.0,5.19...|20.6| 22.03988057408082|
|[0.03445,82.5,2.0...|24.1|29.465477633767257|
|[0.03584,80.0,3.3...|23.5| 30.30832126971645|
|[0.04297,52.5,5.3...|24.8| 26.47492241157351|
|[0.04301,80.0,1.9...|18.2| 14.37187950767677|
|[0.04741,0.0,11.9...|11.9|22.659120616397626|
|[0.04981,21.0,5.6...|23.4| 23.71717112612974|
|[0.05023,35.0,6.0...|17.1|20.027184905628747|
|[0.05083,0.0,5.19...|22.2|21.854711757031147|
|[0.05302,0.0,3.41...|28.7| 30.86567833853092|
|[0.05561,70.

In [20]:
#coefficient of the regression model
coeff = model.coefficients

#X and Y intercept
intr = model.intercept
print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.1125, 0.0481, -0.0105, 1.8906, -17.3943, 3.5722, 0.0034, -1.5492, 0.2548, -0.0103, -0.9201, 0.0084, -0.5301])
The Intercept of the model is : 37.545056


# Performance Analysis

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

In [24]:
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.580
MSE: 20.972
MAE: 3.324
r2: 0.772


References:

https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c