Install necessary packages.

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz 

In [10]:
!ls

sample_data  spark-3.1.2-bin-hadoop3.2.tgz


In [11]:
!tar xf /content/spark-3.1.2-bin-hadoop3.2.tgz

In [12]:
!ls


sample_data  spark-3.1.2-bin-hadoop3.2	spark-3.1.2-bin-hadoop3.2.tgz


In [13]:
!pip install -q findspark

Import necessary libraries

In [14]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

local spark session to test our installation

In [15]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [16]:
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv

--2021-06-30 10:55:31--  https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv’


2021-06-30 10:55:32 (11.2 MB/s) - ‘BostonHousing.csv’ saved [35735/35735]



or our linear regression model, we need to import Vector Assembler and Linear Regression modules from the PySpark API. Vector Assembler is a transformer tool that assembles all the features into one vector from multiple columns that contain type double. We should have used (must use) StringIndexer if any of our columns contains string values to convert it into numeric values. Luckily, the BostonHousing dataset only contains type double, so we can skip StringIndexer for now.

In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

dataset = spark.read.csv('BostonHousing.csv', inferSchema=True, header=True)

In [18]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [19]:
import pandas as pd

df = pd.read_csv('/content/BostonHousing.csv')

df.head()

Unnamed: 0,crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,b,lstat,medv
0,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.09,1,296,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,0,0.469,7.185,61.1,4.9671,2,242,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222,18.7,396.9,5.33,36.2


Input all the feaures in one vector column

In [20]:
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')

In [21]:
output = assembler.transform(dataset)

finalized_data = output.select("Attributes","medv")

finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



In [22]:
train_data, test_data = finalized_data.randomSplit([0.8, 0.2])

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

regressor = regressor.fit(train_data)

pred = regressor.evaluate(test_data)

pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.00632,18.0,2.3...|24.0|30.242606096984897|
|[0.00906,90.0,2.9...|32.2| 31.80694389241852|
|[0.01096,55.0,2.2...|22.0| 27.60133697004764|
|[0.0136,75.0,4.0,...|18.9|14.483707374388317|
|[0.01709,90.0,2.0...|30.1| 25.63902766704069|
|[0.01778,95.0,1.4...|32.9| 30.64126675591632|
|[0.0187,85.0,4.15...|23.1|24.984209141878736|
|[0.02177,82.5,2.0...|42.3| 37.63121996468902|
|[0.02543,55.0,3.7...|23.9|27.626035377121287|
|[0.03041,0.0,5.19...|18.5|19.068947961047435|
|[0.03113,0.0,4.39...|17.5| 17.06862079771622|
|[0.03768,80.0,1.5...|34.6| 35.47461921465357|
|[0.03961,0.0,5.19...|21.1|20.489476977365168|
|[0.04011,80.0,1.5...|33.3|36.850231675055085|
|[0.04113,25.0,4.8...|28.0|28.609207133649953|
|[0.04203,28.0,15....|22.9|29.018646327783134|
|[0.04527,0.0,11.9...|20.6|21.965871042349292|
|[0.0459,52.5,5.32...|22.3|27.270210571207535|
|[0.04819,80.

In [23]:
coeff = regressor.coefficients

intr = regressor.intercept

print(coeff)
print(intr)

[-0.12062201976923195,0.03137711216764952,0.048271839359598126,2.3601983025344495,-19.106037893947775,4.6220996167126875,0.0018698420115204582,-1.3393309539520353,0.29820582352660324,-0.011721200240248046,-1.0575769480376835,0.007873314758871288,-0.5065618243992043]
33.56172178005755


In [26]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

rmse = eval.evaluate(pred.predictions)
print(rmse)

5.1650640640985594
