<a href="https://colab.research.google.com/github/NicoEssi/ID2223_Scaleable_Machine_Learning_and_Deep_Learning/blob/master/sparkml_lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0. Dependencies & Setup

## 0.1. Related dependencies and environment variables

In [1]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://mirrors.viethosting.com/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark

# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 48.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=7c674b03e72093fbf8effbed42eb794d5ff5e40fa45e21dc47194d056118357d
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


## 0.2. Initialize Spark

In [2]:
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')
 
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("sparkml_lab") \
.master("local[*]") \
.getOrCreate()

# Check
spark.sparkContext.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.app.name', 'sparkml_lab'),
 ('spark.driver.host', '33281ebc5548'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1574456891177'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '42765')]

In [3]:
spark

# 1. Import Housing Dataset

In [0]:
import tarfile
import urllib

DOWNLOAD_ROOT = "https://raw.githubusercontent.com/ageron/handson-ml2/master/"
HOUSING_PATH = os.path.join("datasets", "housing")
HOUSING_URL = DOWNLOAD_ROOT + "datasets/housing/housing.tgz"

def fetch_housing_data(housing_url=HOUSING_URL, housing_path=HOUSING_PATH):
    os.makedirs(housing_path, exist_ok=True)
    tgz_path = os.path.join(housing_path, "housing.tgz")
    urllib.request.urlretrieve(housing_url, tgz_path)
    housing_tgz = tarfile.open(tgz_path)
    housing_tgz.extractall(path=housing_path)
    housing_tgz.close()

fetch_housing_data()

In [84]:
import pandas as pd

def load_housing_data(housing_path=HOUSING_PATH):
    csv_path = os.path.join(housing_path, "housing.csv")
    return pd.read_csv(csv_path)

housing_pd = load_housing_data()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20640 entries, 0 to 20639
Data columns (total 10 columns):
longitude             20640 non-null float64
latitude              20640 non-null float64
housing_median_age    20640 non-null float64
total_rooms           20640 non-null float64
total_bedrooms        20433 non-null float64
population            20640 non-null float64
households            20640 non-null float64
median_income         20640 non-null float64
median_house_value    20640 non-null float64
ocean_proximity       20640 non-null object
dtypes: float64(9), object(1)
memory usage: 1.6+ MB


# 2. Data Discovery

In [0]:
#housing = spark.read.csv("/content/datasets/housing/housing.csv", header = True)
housing = spark.createDataFrame(housing_pd)

## 2.1. Schema and dimensions

Printing schema of the dataset

In [90]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Printing number of records in the dataset

In [91]:
housing.count()

20640

## 2.2. Look at the data

Printing first five records of the dataset

In [92]:
housing.take(5)

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.22, latitude=37.86, housing_median_age=21.0, total_rooms=7099.0, total_bedrooms=1106.0, population=2401.0, households=1138.0, median_income=8.3014, median_house_value=358500.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.24, latitude=37.85, housing_median_age=52.0, total_rooms=1467.0, total_bedrooms=190.0, population=496.0, households=177.0, median_income=7.2574, median_house_value=352100.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1274.0, total_bedrooms=235.0, population=558.0, households=219.0, median_income=5.6431, median_house_value=341300.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0,

Printing the number of records with population more than 10000

In [93]:
housing.where(housing.population > 10000).collect()

[Row(longitude=-121.92, latitude=37.53, housing_median_age=7.0, total_rooms=28258.0, total_bedrooms=3864.0, population=12203.0, households=3701.0, median_income=8.4045, median_house_value=451100.0, ocean_proximity='<1H OCEAN'),
 Row(longitude=-117.78, latitude=34.03, housing_median_age=8.0, total_rooms=32054.0, total_bedrooms=5290.0, population=15507.0, households=5050.0, median_income=6.0191, median_house_value=253900.0, ocean_proximity='<1H OCEAN'),
 Row(longitude=-117.87, latitude=34.04, housing_median_age=7.0, total_rooms=27700.0, total_bedrooms=4179.0, population=15037.0, households=4072.0, median_income=6.6288, median_house_value=339700.0, ocean_proximity='<1H OCEAN'),
 Row(longitude=-117.88, latitude=33.96, housing_median_age=16.0, total_rooms=19059.0, total_bedrooms=3079.0, population=10988.0, households=3061.0, median_income=5.5469, median_house_value=265200.0, ocean_proximity='<1H OCEAN'),
 Row(longitude=-118.78, latitude=34.16, housing_median_age=9.0, total_rooms=30405.0, to

## 2.3. Statistical summary

Printing summary of the table statistics for the attributes housing_median_age, total_rooms, median_house_value, and population.

In [94]:
housing.describe(["housing_median_age", "total_rooms", "median_house_value", "population"]).show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev|12.585557612111632|2181.6152515827957|115395.61587441375|1132.4621217653405|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (housing_median_age), the minimum number of rooms (total_rooms), and the average of house values (median_house_value).

In [95]:
value1 = housing.agg({"housing_median_age": "max"}).collect()[0]
value2 = housing.agg({"total_rooms" : "min"}).collect()[0]
value3 = housing.agg({"median_house_value" : "mean"}).collect()[0]

print("Maximum housing age: " + str(value1["max(housing_median_age)"]) + 
      "\nMinimum number of rooms: " + str(value2["min(total_rooms)"]) +
      "\nAverage house value: " + str(value3["avg(median_house_value)"]))

Maximum housing age: 52.0
Minimum number of rooms: 2.0
Average house value: 206855.81690891474


## 2.4. Data breakdown by categorical data

Printing the number of houses in different areas (ocean_proximity), and sorted in descending order.

In [96]:
housing.cube("ocean_proximity").count().sort('count', ascending=False).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|           null|20640|
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Printing the average value of the houses (median_house_value) in different areas (ocean_proximity); calling the new column avg_value when printing it.

In [97]:
from pyspark.sql import functions as F

housing.\
select(["ocean_proximity", "median_house_value"]).\
groupBy("ocean_proximity").\
agg(F.mean("median_house_value").alias("avg_value")).\
show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewritten in SQL.

In [98]:
housing.createOrReplaceTempView("housing")
spark.sql('''
          SELECT ocean_proximity, avg(median_house_value)
          AS avg_value
          FROM housing
          GROUP BY ocean_proximity
          ''').show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes

Printing the correlation among the attributes housing_median_age, total_rooms, median_house_value, and population. To do so, first we need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, we use the VectorAssembler Transformer.

In [103]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["housing_median_age",
                                       "total_rooms",
                                       "median_house_value",
                                       "population"],
                             outputCol="attributes")

housing_attributes = assembler.transform(housing)

housing_attributes.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|          attributes|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

In [104]:
from pyspark.ml.stat import Correlation

corr = Correlation.corr(housing_attributes, "attributes")

corr.collect()[0]["pearson({})".format("attributes")].values

array([ 1.        , -0.3612622 ,  0.10562341, -0.29624424, -0.3612622 ,
        1.        ,  0.13415311,  0.85712597,  0.10562341,  0.13415311,
        1.        , -0.02464968, -0.29624424,  0.85712597, -0.02464968,
        1.        ])

## 2.6. Combine and make new attributes

Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, we add the three new columns to the dataset as below. We will call the new dataset the housingExtra.

```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```



In [105]:
new_attributes1 = housing.withColumn("rooms_per_household",
                                    housing.total_rooms / housing.households)

new_attributes2 = new_attributes1.withColumn("bedrooms_per_room",
                                    new_attributes1.total_bedrooms / new_attributes1.total_rooms)

new_housing = new_attributes2.withColumn("population_per_household",
                                    new_attributes2.population / new_attributes2.households)

new_housing.select("rooms_per_household",
                   "bedrooms_per_room",
                   "population_per_household").show()

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
|  4.761658031088083|0.23177366702937977|       2.139896373056995|
| 4.9319066147859925|0.19289940828402366|      2.1284046692607004|
|  4.797527047913447|0.22132731958762886|      1.7882534775888717|
|  4.294117647058823| 0.2602739726027397|       2.026890756302521|
|  4.970588235294118| 0.1992110453648915|       2.172268907563025|
|  5.477611940298507|0.19709355131698456|       2.263681592039801|
|  4.772479564032698| 0.2146731373108764|      2.0490463215258

# 3. Data Preparation

Before going through the Machine Learning steps, let's first rename the label column from median_house_value to label.


In [0]:
housing_renamed = new_housing.withColumnRenamed("median_house_value", "label")


Now, we want to separate the numerical attributes from the categorical attribute (ocean_proximity) and keep their column names in two different lists. Moreover, since we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors.

In [0]:
columns_features_num = housing_renamed.columns

columns_features_num.remove("ocean_proximity")
columns_features_num.remove("label")

columns_features_cat = ["ocean_proximity"]

## 3.1. Prepare Continuous Attributes

### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in colNum.

In [108]:
null_df = housing_renamed.select([F.count(F.when(F.isnan(i) | \
                                          F.col(i).contains('NA') | \
                                          F.col(i).contains('NULL') | \
                                          F.col(i).isNull(), i)).alias(i) \
                                  for i in housing_renamed.columns])

null_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|label|ocean_proximity|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|    0|              0|                  0|              207|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+



As we observerd above, the total_bedrooms and bedrooms_per_room attributes have some missing values. One way to take care of missing values is to use the Imputer Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an Imputer instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [109]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ["total_bedrooms", "bedrooms_per_room"],
                  outputCols = ["total_bedrooms", "bedrooms_per_room"])

housing_imputed = imputer.fit(housing_renamed).transform(housing_renamed)

housing_imputed.select(["total_bedrooms", "bedrooms_per_room"]).show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



### Scaling
One of the most important transformations we need to apply to our data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the StandardScaler Estimator. To use StandardScaler, again we need to convert all the numerical attributes into a big vectore of features using VectorAssembler, and then call StandardScaler on that vector.

In [0]:
vectas = VectorAssembler(inputCols = columns_features_num,
                         outputCol = "features")

housing_featured = vectas.transform(housing_imputed)

In [0]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaled",
                        withStd = True)

scalerModel = scaler.fit(housing_featured)

housing_scaled = scalerModel.transform(housing_featured)

In [112]:
housing_scaled.printSchema()

housing_scaled.show(5)

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled: vector (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|medi

## 3.2. Prepare categorical attributes

After imputing and scaling the continuous attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute ocean_proximity.

In [113]:
from pyspark.sql.functions import countDistinct

housing_renamed.agg(countDistinct("ocean_proximity")).show()

+-------------------------------+
|count(DISTINCT ocean_proximity)|
+-------------------------------+
|                              5|
+-------------------------------+



### String indexer

Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute ocean_proximity to numbers. To do so, we can use the StringIndexer that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [114]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = "ocean_proximity", 
                        outputCol = "ocean_proximity_indexed")

housing_indexed = indexer.fit(housing_renamed).transform(housing_renamed)

housing_indexed.show(10)
housing_indexed.agg(countDistinct("ocean_proximity_indexed")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_indexed|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.2

###One-hot encoding
Now we convert the label indices built in the last step into one-hot vectors. To do this, we can take advantage of the OneHotEncoderEstimator Estimator.

In [115]:
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(inputCols = ["ocean_proximity_indexed"],
                                 outputCols = ["ocean_proximity_hot"])

housing_encoded = encoder.fit(housing_indexed).transform(housing_indexed)

housing_encoded.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_indexed|ocean_proximity_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|      (4,[3],[1.0])|
|  -122.22|   37.86|              21.0|     7099.0| 

# 4. Pipeline

As you can see, there are many data transformation steps that need to be executed in the right order. For example, we called the Imputer, VectorAssembler, and StandardScaler from left to right. However, we can use the Pipeline class to define a sequence of Transformers/Estimators, and run them in order. A Pipeline is an Estimator, thus, after a Pipeline's fit() method runs, it produces a PipelineModel, which is a Transformer.

Now, let's create a pipeline called numPipeline to call the numerical transformers you built above (imputer, va, and scaler) in the right order from left to right, as well as a pipeline called catPipeline to call the categorical transformers (indexer and encoder). Then, put these two pipelines numPipeline and catPipeline into one pipeline.

In [0]:
from pyspark.ml import Pipeline, PipelineModel

# Imputer
imp = Imputer(inputCols = ["total_bedrooms", "bedrooms_per_room"],
              outputCols = ["total_bedrooms", "bedrooms_per_room"])

# VectorAssembler
vec = VectorAssembler(inputCols = columns_features_num,
                      outputCol = "vfeatures")

# StandardScaler
sca = StandardScaler(inputCol = "vfeatures",
                     outputCol = "scaled",
                     withStd = True)

# numPipeline
numPipeline = Pipeline(stages = [imp, vec, sca])

In [0]:
# Indexer
ind = StringIndexer(inputCol = "ocean_proximity", 
                    outputCol = "ocean_proximity_indexed")

# Encoder
enc = OneHotEncoderEstimator(inputCols = ["ocean_proximity_indexed"],
                             outputCols = ["ocean_proximity_hot"])

# catPipeline
catPipeline = Pipeline(stages = [ind, enc])

In [128]:
# Pipeline Assembly
pipeline = Pipeline(stages = [numPipeline, catPipeline])

# Fit and transform
housing_new = pipeline.fit(housing_renamed).transform(housing_renamed)

housing_new.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|           vfeatures|              scaled|ocean_proximity_indexed|ocean_proximity_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090

Now, we use VectorAssembler to put all attributes of the final dataset housing_new into a big vector, and call the new column features.

In [129]:
newVec = VectorAssembler(inputCols = ["scaled", "ocean_proximity_hot"],
                         outputCol = "features")

dataset = newVec.transform(housing_new).select("features", "label")

dataset.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-61.007269596069...|452600.0|
|[-61.002278409814...|358500.0|
|[-61.012260782324...|352100.0|
|[-61.017251968579...|341300.0|
|[-61.017251968579...|342200.0|
+--------------------+--------+
only showing top 5 rows



# 5. Make a model

Here we going to make four different regression models:

*   Linear regression model
*   Decision tree regression
*   Random forest regression
*   Gradient-Boosted Forest Regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (trainSet) with 80% of the whole data, and test dataset (testSet) with 20% of it.

In [0]:
training_set, test_set = dataset.randomSplit([0.8, 0.2], 1)

## 5.1. Linear regression model

Now, we train a Linear Regression model using the LinearRegression class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the summary method.

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression()

lrModel = lr.fit(training_set)

In [142]:
print(lrModel.coefficients)
print(lrModel.summary.rootMeanSquaredError)

[-54477.37158459408,-55101.840033021814,13288.061745034529,10009.769723217136,710.7269088373888,-52958.80028726112,46244.11745493459,77314.65490018381,5716.086889681724,15958.634084930614,1399.752595818576,-175517.12411436523,-211742.57639437815,-171874.4024450686,-180607.27387192386]
67516.54425720421


Now, we use RegressionEvaluator to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [144]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lrModel.transform(test_set)
predictions.select("prediction", "label", "features").show(5)

evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions)
print(rmse)

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|182697.27426641947|111400.0|[-62.020480405854...|
|146301.48449560208| 58100.0|[-61.995524474579...|
| 156240.5103589571| 72200.0|[-61.980550915813...|
|162194.05783506343| 70200.0|[-61.980550915813...|
| 199432.9147604569|128900.0|[-61.975559729558...|
+------------------+--------+--------------------+
only showing top 5 rows

69613.05587787666


## 5.2. Decision tree regression

Repeat what you have done on Regression Model to build a Decision Tree model. We use the DecisionTreeRegressor to make a model and then measure its RMSE on the test dataset.

In [145]:
from pyspark.ml.regression import DecisionTreeRegressor

dtr = DecisionTreeRegressor()

dtrModel = dtr.fit(training_set)

predictions = dtrModel.transform(test_set)

rmse = evaluator.evaluate(predictions)
print(rmse)

68367.78311669921


## 5.3. Random forest regression

Let's try the test error on a Random Forest Model. We can use the RandomForestRegressor to make a Random Forest model.

In [147]:
from pyspark.ml.regression import RandomForestRegressor

rfr = RandomForestRegressor()

rfrModel = rfr.fit(training_set)

predictions = rfrModel.transform(test_set)

rmse = evaluator.evaluate(predictions)
print(rmse)

65200.74665160275


## 5.4. Gradient Boosted Tree Regression

Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. We use the GBTRegressor to build the model.

In [148]:
from pyspark.ml.regression import GBTRegressor

gbtr = GBTRegressor()

gbtrModel = gbtr.fit(training_set)

predictions = gbtrModel.transform(test_set)

rmse = evaluator.evaluate(predictions)
print(rmse)

56608.66221999432


# 6. Hyperparameter Tuning

An important task in Machie Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as CrossValidator. These tools require the following items:

*   Estimator
*   ParamMaps
*   Evaluator

CrossValidator begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with k=3 folds, CrossValidator will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular ParamMap, CrossValidator computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best ParamMap, CrossValidator finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, we use the CrossValidator to select the best Random Forest model. To do so, we need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [151]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().\
            addGrid(rfr.numTrees, [1, 5, 10]).\
            addGrid(rfr.maxDepth, [5, 10, 15]).build()

cv = CrossValidator(estimator = rfr,
                    estimatorParamMaps = paramGrid,
                    evaluator = evaluator,
                    numFolds = 3)

cvModel = cv.fit(training_set)

predictions = cvModel.transform(test_set)

rmse = evaluator.evaluate(predictions)

print(rmse)

52926.69308975491


# 7. Custom transformer

At the end of part two, we added extra columns to the housing dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns inputCol1 and inputCol2, as well as the name of ouput column outputCol. It, then, computes inputCol1 divided by inputCol2, and adds its result as a new column to the dataset. The details of the implemeting a custom Tranfomer is explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before we start to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (StructType).

In [0]:
from pyspark.sql.types import StructField, StructType, DoubleType, StringType
from pyspark.ml.param import Param, Params

# Code

Then, we extend the class Transformer, and implement its setter functions for the input and output columns, and call then setInputCol1, setInputCol2, and setOutputCol. Morever, we need to override the methods copy, transformSchema, and the transform. The details of what we need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [0]:
# Code

Now, an instance of MyTransformer, and set the input columns total_rooms and households, and the output column rooms_per_household and run it over the housing dataset.

In [0]:
# myTransformer = MyTransformer(inputCol1 = "total_rooms",
#                               inputCol2 = "households",
#                               outputCol = "rooms_per_household")

# myDataset = myTransformer.transform(housing).
#             select("rooms_per_household").show()

# 8. Custom estimator (predictor)

text

In [0]:
# code

# 9. 

As the last step, we are given a dataset called data/ccdefault.csv. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at data/ccdefault.txt. In this task, we should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:

1.   Load the data.
2.   Carry out some exploratory analysis
3.   Train a model to predict the target variable
4.   What else can we do with this data? Anything we can do to devise a better solution?

In regards to training a model, we must:

*   Employ three different models (logistic regression, decision tree, and random forest).
*   Compare the models' performances (e.g. AUC).
*   Defend your choice of best model (e.g., what are the strengths and weaknesses of each of these models?).






In [0]:
# code