In [2]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [7]:
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [8]:
# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [9]:
# install findspark 
!pip install -q findspark

In [10]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [15]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(4, )

+-----+
|hello|
+-----+
|world|
|world|
|world|
|world|
+-----+
only showing top 4 rows





# **Linear Regression**

In [19]:
from google.colab import files
files.upload()

Saving Boston.csv to Boston.csv


{'Boston.csv': b'ID,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,MEDV\r\n1,7.52601,0,18.1,0,0.713,6.417,98.3,2.185,24,666,20.2,304.21,19.31,13\r\n2,0.0136,75,4,0,0.41,5.888,47.6,7.3197,3,469,21.1,396.9,14.8,18.9\r\n3,0.05789,12.5,6.07,0,0.409,5.878,21.4,6.498,4,345,18.9,396.21,8.1,22\r\n4,0.05646,0,12.83,0,0.437,6.232,53.7,5.0141,5,398,18.7,386.4,12.34,21.2\r\n5,2.14918,0,19.58,0,0.871,5.709,98.5,1.6232,5,403,14.7,261.95,15.79,19.4\r\n6,0.22876,0,8.56,0,0.52,6.405,85.4,2.7147,5,384,20.9,70.8,10.63,18.6\r\n7,0.00906,90,2.97,0,0.4,7.088,20.8,7.3073,1,285,15.3,394.72,7.85,32.2\r\n8,0.03961,0,5.19,0,0.515,6.037,34.5,5.9853,5,224,20.2,396.9,8.01,21.1\r\n9,0.53412,20,3.97,0,0.647,7.52,89.4,2.1398,5,264,13,388.37,7.26,43.1\r\n10,0.08873,21,5.64,0,0.439,5.963,45.7,6.8147,4,243,16.8,395.56,13.45,19.7\r\n11,0.17446,0,10.59,1,0.489,5.96,92.1,3.8771,4,277,18.6,393.25,17.27,21.7\r\n12,0.1029,30,4.93,0,0.428,6.358,52.9,7.0355,6,300,16.6,372.75,11.22,22.2\r\n13,0.03932,0,3.41,0,0.489,6.4

In [20]:
!ls

 Boston.csv		        spark-3.0.0-bin-hadoop3.2
'Boston housing prices.ipynb'   spark-3.0.0-bin-hadoop3.2.tgz
 sample_data


In [21]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
dataset = spark.read.csv('Boston.csv',inferSchema=True, header =True)

In [22]:
# data types of each column
dataset.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MEDV: double (nullable = true)



**Convert all the features from different columns into a single column and we can call the new vector column as ‘Attributes’ in the outputCol.**

In [25]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], outputCol = 'Attributes')
output = assembler.transform(dataset)
#Input vs Output
finalized_data = output.select("Attributes","MEDV")
finalized_data.show()

+--------------------+----+
|          Attributes|MEDV|
+--------------------+----+
|[7.52601,0.0,18.1...|13.0|
|[0.0136,75.0,4.0,...|18.9|
|[0.05789,12.5,6.0...|22.0|
|[0.05646,0.0,12.8...|21.2|
|[2.14918,0.0,19.5...|19.4|
|[0.22876,0.0,8.56...|18.6|
|[0.00906,90.0,2.9...|32.2|
|[0.03961,0.0,5.19...|21.1|
|[0.53412,20.0,3.9...|43.1|
|[0.08873,21.0,5.6...|19.7|
|[0.17446,0.0,10.5...|21.7|
|[0.1029,30.0,4.93...|22.2|
|[0.03932,0.0,3.41...|22.0|
|[0.85204,0.0,8.14...|19.6|
|[0.15445,25.0,5.1...|23.3|
|[0.06417,0.0,5.96...|18.9|
|[4.03841,0.0,18.1...|19.6|
|[14.3337,0.0,18.1...|21.4|
|[6.96215,0.0,18.1...|15.1|
|[0.19802,0.0,10.5...|25.0|
+--------------------+----+
only showing top 20 rows



**Attributes is the feature column and MEDV is the target column**

In [26]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])
regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'MEDV')

In [27]:
#Learn to fit the model from training set
regressor = regressor.fit(train_data)
#To predict the prices on testing set
pred = regressor.evaluate(test_data)
#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|MEDV|        prediction|
+--------------------+----+------------------+
|[0.02187,60.0,2.9...|31.1| 32.10248937304404|
|[0.03359,75.0,2.9...|34.9| 34.61655270843251|
|[0.03548,80.0,3.6...|20.9| 23.07951738337617|
|[0.03659,25.0,4.8...|24.8|26.242025611877345|
|[0.04297,52.5,5.3...|24.8|27.170675658810353|
|[0.0459,52.5,5.32...|22.3| 27.54744566025304|
|[0.04684,0.0,3.41...|22.6| 26.51163067144139|
|[0.04819,80.0,3.6...|21.9|24.950603679345505|
|[0.05561,70.0,2.2...|29.0|  31.6518295782811|
|[0.05646,0.0,12.8...|21.2|20.855179018627986|
|[0.06588,0.0,2.46...|39.8| 32.68897384999043|
|[0.06664,0.0,4.05...|29.4|29.841466591843677|
|[0.06724,0.0,3.24...|22.6| 23.58863846020079|
|[0.07013,0.0,13.8...|28.7|27.044255427779337|
|[0.07022,0.0,4.05...|23.2|25.368673997220135|
|[0.07503,33.0,2.1...|33.4| 34.75138755133083|
|[0.07978,40.0,6.4...|29.1|30.583412526590656|
|[0.08014,0.0,5.96...|21.0| 23.34023803054133|
|[0.08826,0.0

In [28]:
#coefficient of the regression model
coeff = regressor.coefficients
print ("The coefficient of the model is : %a" %coeff)

The coefficient of the model is : DenseVector([-0.1143, 0.0686, 0.002, 3.749, -18.4109, 2.7639, -0.0042, -1.5415, 0.3218, -0.0131, -0.8121, 0.0092, -0.4873])


In [29]:
#X and Y intercept
intr = regressor.intercept
print ("The Intercept of the model is : %f" %intr)

The Intercept of the model is : 40.840287


In [32]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="MEDV", predictionCol="prediction", metricName="rmse")

In [33]:
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

RMSE: 5.263


In [34]:
# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

MSE: 27.699


In [35]:
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

MAE: 3.607


In [36]:
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.720
