# Using Models with MLlib in Pyspark

The data used in this project is from the Kaggle competition: Housing Values in Suburbs of Boston. For each house observation, we have the following information:

CRIM — per capita crime rate by town.

ZN — proportion of residential land zoned for lots over 25,000 sq.ft.

INDUS — proportion of non-retail business acres per town.

CHAS — Charles River dummy variable (= 1 if tract bounds river; 0 otherwise).

NOX — nitrogen oxides concentration (parts per 10 million).

RM — average number of rooms per dwelling.

AGE — proportion of owner-occupied units built prior to 1940.

DIS — weighted mean of distances to five Boston employment centres.

RAD — index of accessibility to radial highways.

TAX — full-value property-tax rate per $10,000.

PTRATIO — pupil-teacher ratio by town.

BLACK — 1000(Bk — 0.63)² where Bk is the proportion of blacks by town.

LSTAT — lower status of the population (percent).

MEDV — median value of owner-occupied homes in $1000s. This is the target variable.

The input data set contains data about details of various houses. Based on the information provided, the goal is to come up with a model to predict median value of a given house in the area.

In [None]:
# !pip install pyspark

### Import the necessary Packages:

### Import the necessary Packages:

### Import the necessary Packages:

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
import warnings
warnings.filterwarnings('ignore')

ModuleNotFoundError: No module named 'pyspark'

## Important related concepts:

Pyspark is a python API for working with Spark. 

Python API is a tool you can use with the syntax and agility of python to interact with and send commands to a system that is not based on python.

Usually, we would define the amount of data that suits PySpark as what would not fit into single-machine storage (let alone RAM).

Important related concepts:

**Distributed computing** - when you distribute a task into several smaller tasks and run all of them at the same time. Pyspark allows you to do it on multiple machines, but it can also be done on a single machine with several threads.

**Cluster** - a network of machines that can take on tasks from a user, interact with one another and return results. 

**Resilient Distributed Dataset (RDD)** was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

### Spark Architecture

To send commands and receive results from a cluster, you will need to initiate a spark session. This object is your tool for interacting with Spark.

This object is your tool for interacting with Spark. Each user of the cluster will have its own Spark Session, that will allow us use the cluster in isolation. All of the sessions are communicating with the main node in the cluster. Main node assigns each of computers in the cluster tasks and coordinates them. Those computers are worker nodes. In order to connect to a worker node, the main node needs to get that node's computing power allocated to it. Allocation of cluster resources is performed by a cluster manager. Each worker node run tasks in parrallel with other worker node and has its own cache for storing results.

In [4]:
spark = SparkSession.builder.appName('BostonHousing').getOrCreate() # will return existing session if one was

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/02 10:09:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

### Data exploration

Print Schema in a tree format.

In [6]:
house_df = spark.read.csv("data/Boston_Housing.csv", inferSchema=True, header =True)
house_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MEDV: double (nullable = true)



In [7]:
# show head of table
house_df.show(3)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|MEDV|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 3 rows



In [8]:
# column dtypes as list of tuples
house_df.dtypes

[('CRIM', 'double'),
 ('ZN', 'double'),
 ('INDUS', 'double'),
 ('CHAS', 'int'),
 ('NOX', 'double'),
 ('RM', 'double'),
 ('AGE', 'double'),
 ('DIS', 'double'),
 ('RAD', 'int'),
 ('TAX', 'int'),
 ('PTRATIO', 'double'),
 ('B', 'double'),
 ('LSTAT', 'double'),
 ('MEDV', 'double')]

## Pandas DataFrame VS PySpark DataFrame
Both tools represent a table of data with rows and columns. However, under the hood they are different, PySpark dataframe needs to support distributed computations. As we move forward, we will see more and more features of it that are not present in Pandas DataFrame. 

Perform descriptive analytics

In [None]:
house_df.describe().toPandas().transpose()

[Stage 3:>                                                          (0 + 1) / 1]

Scatter matrix is a great way to roughly determine if we have a linear correlation between multiple independent variables.

In [None]:
import pandas as pd

numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

It’s hard to see. Let’s find correlation between independent variables and target variable.

In [None]:
import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, house_df.stat.corr('MEDV',i))

## SQL queries in PySpark

In [None]:
# run an SQL query on the data
house_df.createOrReplaceTempView("df") # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT CRIM from df""").show(2)


In [None]:
house_df.selectExpr("CRIM >= 0.004", "CRIM").show(2)


We are going to keep all the variables, for now.

Prepare data for Machine Learning. And we need two columns only — features and label(“MEDV”):

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'MEDV'])
vhouse_df.show(3)

In [None]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Linear Regression 

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MEDV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Summarize the model over the training set and print out some metrics:

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE measures the differences between predicted values by the model and the actual values. However, RMSE alone is meaningless until we compare with the actual "MV" value, such as mean, min and max. After such comparison, our RMSE looks pretty good.

In [None]:
train_df.describe().show()


In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MEDV","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MEDV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

## Decision tree regression


In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'MEDV')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="MEDV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

 Feature Imortance 

In [None]:
train_df.take(1)


In [None]:
dt_model.featureImportances


In [None]:
house_df.take(1)

The number of rooms is the most important feature to predict the house price.

## Gradient-boosted tree regression


In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'MEDV', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'MEDV', 'features').show(5)

In [None]:
gbt_evaluator = RegressionEvaluator(
    labelCol="MEDV", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [44]:
house_df.take(1)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1, TAX=296, PTRATIO=15.3, B=396.9, LSTAT=4.98, MEDV=24.0)]

The number of rooms is the most important feature to predict the house price.

## Gradient-boosted tree regression


In [48]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'MEDV', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'MEDV', 'features').show(5)

+-----------------+----+--------------------+
|       prediction|MEDV|            features|
+-----------------+----+--------------------+
|23.90713808229739|24.0|[0.00632,18.0,2.3...|
|34.85150956609258|35.4|[0.01311,90.0,1.2...|
|46.25376624965973|50.0|[0.01381,80.0,0.4...|
|25.30726781974368|29.1|[0.01439,60.0,2.9...|
|47.63891166847676|50.0|[0.01501,90.0,1.2...|
+-----------------+----+--------------------+
only showing top 5 rows



In [49]:
gbt_evaluator = RegressionEvaluator(
    labelCol="MEDV", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 3.5941
