<a href="https://colab.research.google.com/github/beccycole/BigDataProject2021/blob/main/Ridge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install PySpark

In [None]:
# download Java, install Apache Spark 3.1.1 with Hadoop 2.7 and unzip folder
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz

# install the findspark library
!pip install -q findspark

# set up environment path to enable PySpark to run in Colab
import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

# import findspark
import findspark
findspark.init()

## Import SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Import Boston Housing dataset from Github

In [None]:
!wget https://raw.githubusercontent.com/beccycole/BigDataProject2021/main/Boston.csv

--2021-04-24 10:43:31--  https://raw.githubusercontent.com/beccycole/BigDataProject2021/main/Boston.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 37123 (36K) [text/plain]
Saving to: ‘Boston.csv’


2021-04-24 10:43:31 (8.97 MB/s) - ‘Boston.csv’ saved [37123/37123]



## Load data into Spark dataframe

In [None]:
# read the csv and load into a Spark dataframe using inferSchema parameter to automatically determine the data type for each column
df2 = spark.read.csv("Boston.csv", header=True, inferSchema=True)

# display first 6 rows
df2.show(6, truncate=False)

+---+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|_c0|crim   |zn  |indus|chas|nox  |rm   |age |dis   |rad|tax|ptratio|black |lstat|medv|
+---+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|1  |0.00632|18.0|2.31 |0   |0.538|6.575|65.2|4.09  |1  |296|15.3   |396.9 |4.98 |24.0|
|2  |0.02731|0.0 |7.07 |0   |0.469|6.421|78.9|4.9671|2  |242|17.8   |396.9 |9.14 |21.6|
|3  |0.02729|0.0 |7.07 |0   |0.469|7.185|61.1|4.9671|2  |242|17.8   |392.83|4.03 |34.7|
|4  |0.03237|0.0 |2.18 |0   |0.458|6.998|45.8|6.0622|3  |222|18.7   |394.63|2.94 |33.4|
|5  |0.06905|0.0 |2.18 |0   |0.458|7.147|54.2|6.0622|3  |222|18.7   |396.9 |5.33 |36.2|
|6  |0.02985|0.0 |2.18 |0   |0.458|6.43 |58.7|6.0622|3  |222|18.7   |394.12|5.21 |28.7|
+---+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 6 rows



## Rename columns

In [None]:
# rename column names and create a new Spark dataframe
df3 = df2.withColumnRenamed("_c0", "ID") \
    .withColumnRenamed("crim","crime_rate") \
    .withColumnRenamed("zn","large_zones") \
    .withColumnRenamed("indus","industrial") \
    .withColumnRenamed("chas","charles_river") \
    .withColumnRenamed("nox","nitric_oxide") \
    .withColumnRenamed("rm","rooms") \
    .withColumnRenamed("age","age") \
    .withColumnRenamed("dis","distance_employ") \
    .withColumnRenamed("rad","highway_access") \
    .withColumnRenamed("tax","property_tax") \
    .withColumnRenamed("ptratio","pupil_teacher_ratio") \
    .withColumnRenamed("black","black") \
    .withColumnRenamed("lstat","lower_status") \
    .withColumnRenamed("medv","house_price")

df3.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- crime_rate: double (nullable = true)
 |-- large_zones: double (nullable = true)
 |-- industrial: double (nullable = true)
 |-- charles_river: integer (nullable = true)
 |-- nitric_oxide: double (nullable = true)
 |-- rooms: double (nullable = true)
 |-- age: double (nullable = true)
 |-- distance_employ: double (nullable = true)
 |-- highway_access: integer (nullable = true)
 |-- property_tax: integer (nullable = true)
 |-- pupil_teacher_ratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lower_status: double (nullable = true)
 |-- house_price: double (nullable = true)



## Import PySpark MLlib modules

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Summarizer

# import VectorAssembler and LinearRegression modules from the PySpark API
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import MinMaxScaler

# import LinearRegression
from pyspark.ml.regression import LinearRegression

# import regression metrics for evaluation
from pyspark.mllib.evaluation import RegressionMetrics


## Create one column vector

In [None]:
# display column names
df3.columns

['ID',
 'crime_rate',
 'large_zones',
 'industrial',
 'charles_river',
 'nitric_oxide',
 'rooms',
 'age',
 'distance_employ',
 'highway_access',
 'property_tax',
 'pupil_teacher_ratio',
 'black',
 'lower_status',
 'house_price']

In [None]:
# input all features into one vector column
assembler = VectorAssembler(inputCols=['crime_rate',
 'large_zones',
 'industrial',
 'charles_river',
 'nitric_oxide',
 'rooms',
 'age',
 'distance_employ',
 'highway_access',
 'property_tax',
 'pupil_teacher_ratio',
 'black',
 'lower_status'], outputCol = 'features')

output = assembler.transform(df3)

# to select the features and house_price column only
vector_data = output.select("features","house_price")

vector_data.show()
#output.show()

+--------------------+-----------+
|            features|house_price|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.02731,0.0,7.07...|       21.6|
|[0.02729,0.0,7.07...|       34.7|
|[0.03237,0.0,2.18...|       33.4|
|[0.06905,0.0,2.18...|       36.2|
|[0.02985,0.0,2.18...|       28.7|
|[0.08829,12.5,7.8...|       22.9|
|[0.14455,12.5,7.8...|       27.1|
|[0.21124,12.5,7.8...|       16.5|
|[0.17004,12.5,7.8...|       18.9|
|[0.22489,12.5,7.8...|       15.0|
|[0.11747,12.5,7.8...|       18.9|
|[0.09378,12.5,7.8...|       21.7|
|[0.62976,0.0,8.14...|       20.4|
|[0.63796,0.0,8.14...|       18.2|
|[0.62739,0.0,8.14...|       19.9|
|[1.05393,0.0,8.14...|       23.1|
|[0.7842,0.0,8.14,...|       17.5|
|[0.80271,0.0,8.14...|       20.2|
|[0.7258,0.0,8.14,...|       18.2|
+--------------------+-----------+
only showing top 20 rows



## Normalize the data

In [None]:
# apply MinMaxScaler transformation to the features
features_MinMaxScaler = MinMaxScaler(inputCol = 'features', outputCol = 'scaled_features')

output_scaled = features_MinMaxScaler.fit(vector_data)

scaled_features_df = output_scaled.transform(vector_data)

scaled_features_df.show(10)

+--------------------+-----------+--------------------+
|            features|house_price|     scaled_features|
+--------------------+-----------+--------------------+
|[0.00632,18.0,2.3...|       24.0|[0.0,0.18,0.06781...|
|[0.02731,0.0,7.07...|       21.6|[2.35922539178427...|
|[0.02729,0.0,7.07...|       34.7|[2.35697744000553...|
|[0.03237,0.0,2.18...|       33.4|[2.92795719180468...|
|[0.06905,0.0,2.18...|       36.2|[7.05070075400798...|
|[0.02985,0.0,2.18...|       28.7|[2.64471526768385...|
|[0.08829,12.5,7.8...|       22.9|[9.21323036515279...|
|[0.14455,12.5,7.8...|       27.1|[0.00155367187187...|
|[0.21124,12.5,7.8...|       16.5|[0.00230325139249...|
|[0.17004,12.5,7.8...|       18.9|[0.00184017332607...|
+--------------------+-----------+--------------------+
only showing top 10 rows



## Create a training and test set

In [None]:
# split the dataset into training and testing data
splits = scaled_features_df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

## Ridge regression model

In [None]:
# create ridge regression model
ridge = LinearRegression(featuresCol = 'scaled_features', labelCol = 'house_price', elasticNetParam=0, regParam=0.1)

# learn to fit the model from training set
ridge_model = ridge.fit(train_df)

# make predictions from trained model using test dataset
predictions = ridge_model.transform(test_df)

# show the predictions
predictions.show(10)

+--------------------+-----------+--------------------+------------------+
|            features|house_price|     scaled_features|        prediction|
+--------------------+-----------+--------------------+------------------+
|[0.01311,90.0,1.2...|       35.4|[7.63179628881144...| 30.44140446362487|
|[0.01381,80.0,0.4...|       50.0|[8.41857941136933...|    39.45556312547|
|[0.01501,90.0,1.2...|       50.0|[9.76735047861141...| 42.20269986080041|
|[0.0187,85.0,4.15...|       23.1|[1.39148215103808...|25.062252707645126|
|[0.02187,60.0,2.9...|       31.1|[1.74778250796786...|31.632215334426007|
|[0.02543,55.0,3.7...|       23.9|[2.14791792458301...| 27.36053053204761|
|[0.03359,75.0,2.9...|       34.9|[3.06508225030763...|  33.5024066138044|
|[0.03578,20.0,3.3...|       45.4|[3.31123297007931...|37.656666820877575|
|[0.03738,0.0,5.19...|       20.7|[3.49106911237825...| 21.98747643395368|
|[0.0536,21.0,5.64...|       25.0|[5.31415800493380...|27.548507453322454|
+--------------------+---

## Evaluate the Model
The statistical metrics r2 and RMSE were used to calculate the performance of the model on the test data.

In [None]:
trainingSummary = ridge_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.437426
r2: 0.738069


In [None]:
ridge_predictions = ridge_model.transform(test_df)
ridge_predictions.select("prediction","house_price","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
ridge_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="house_price",metricName="r2")
print("R Squared (R2) on test data = %g" % ridge_evaluator.evaluate(ridge_predictions))

+------------------+-----------+--------------------+
|        prediction|house_price|            features|
+------------------+-----------+--------------------+
| 30.44140446362487|       35.4|[0.01311,90.0,1.2...|
|    39.45556312547|       50.0|[0.01381,80.0,0.4...|
| 42.20269986080041|       50.0|[0.01501,90.0,1.2...|
|25.062252707645126|       23.1|[0.0187,85.0,4.15...|
|31.632215334426007|       31.1|[0.02187,60.0,2.9...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.724866


In [None]:
test_result = ridge_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.83992
