# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [73]:
// TODO: Replace <FILL IN> with appropriate code0

val housing = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("data/housing.csv")

housing = [longitude: double, latitude: double ... 8 more fields]


[longitude: double, latitude: double ... 8 more fields]

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [2]:
// TODO: Replace <FILL IN> with appropriate code

housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [3]:
// TODO: Replace <FILL IN> with appropriate code

housing.count()

20640

## 2.2. Look at the data
Print the first five records of the dataset.

In [4]:
// TODO: Replace <FILL IN> with appropriate code

housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Print the number of records with population more than 10000.

In [5]:
// TODO: Replace <FILL IN> with appropriate code

housing.filter($"population" > 10000).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.92|   37.53|               7.0|    28258.0|        3864.0|   12203.0|    3701.0|       8.4045|          451100.0|      <1H OCEAN|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|      <1H OCEAN|
|  -117.87|   34.04|               7.0|    27700.0|        4179.0|   15037.0|    4072.0|       6.6288|          339700.0|      <1H OCEAN|
|  -117.88|   33.96|              16.0|    19059.0|        3079.0|   10988.0|    3061.0|       5.5469|          265200.0|      <1H OCEAN|
|  -118.78|   34.16|              

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

#### Note:
I am printing them separately so that the you can see the full output

In [6]:
// TODO: Replace <FILL IN> with appropriate code

housing.describe("housing_median_age")

[summary: string, housing_median_age: string]

In [7]:
housing.describe("total_rooms")

[summary: string, total_rooms: string]

In [8]:
housing.describe("median_house_value")

[summary: string, median_house_value: string]

In [9]:
housing.describe("population")

[summary: string, population: string]

And describe all in once:

In [10]:
housing.describe("housing_median_age", "total_rooms", "median_house_value", "population")

[summary: string, housing_median_age: string ... 3 more fields]

Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [11]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.sql.functions._

housing.agg(max("housing_median_age")).show()

+-----------------------+
|max(housing_median_age)|
+-----------------------+
|                   52.0|
+-----------------------+



In [12]:
housing.agg(min("total_rooms")).show()

+----------------+
|min(total_rooms)|
+----------------+
|             2.0|
+----------------+



In [13]:
housing.agg(avg("median_house_value")).show()

+-----------------------+
|avg(median_house_value)|
+-----------------------+
|     206855.81690891474|
+-----------------------+



## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [14]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").count().sort(desc("count")).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [15]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").agg(avg("median_house_value")).withColumnRenamed("avg(median_house_value)", "avg_value").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [16]:
// TODO: Replace <FILL IN> with appropriate code

housing.createOrReplaceTempView("df")
spark.sql("SELECT ocean_proximity, AVG(median_house_value) as avg_value FROM df GROUP BY ocean_proximity").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [17]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.VectorAssembler

val input_columns = Array("housing_median_age", "total_rooms", "median_house_value", "population")
val va = new VectorAssembler().setInputCols(input_columns).setOutputCol("attributes")

val housingAttrs = va.transform(housing)

housingAttrs.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|          attributes|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

input_columns = Array(housing_median_age, total_rooms, median_house_value, population)
va = vecAssembler_75bab98477f3
housingAttrs = [longitude: double, latitude: double ... 9 more fields]


[longitude: double, latitude: double ... 9 more fields]

In [18]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val Row(coeff: Matrix) = Correlation.corr(housingAttrs, "attributes").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293   
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772    
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235  
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0                   


coeff = 


1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293   
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772    
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235  
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0                   

## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [19]:
// TODO: Replace <FILL IN> with appropriate code

val housingCol1 = housing.withColumn("rooms_per_household", housing("total_rooms") / housing("households"))

val housingCol2 = housingCol1.withColumn("bedrooms_per_room", housingCol1("total_bedrooms") / housingCol1("total_rooms"))

val housingExtra = housingCol2.withColumn("population_per_household", housingCol2("population") / housingCol2("households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1 = [longitude: double, latitude: double ... 9 more fields]
housingCol2 = [longitude: double, latitude: double ... 10 more fields]
housingExtra = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [20]:
// TODO: Replace <FILL IN> with appropriate code

val renamedHousing = housingExtra.withColumnRenamed("median_house_value", "label")

renamedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

In [21]:
renamedHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
only showing top 1 row



Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, sice we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [22]:
// label columns
val colLabel = "label"

// categorical columns
val colCat = "ocean_proximity"

// numerical columns
val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel = label
colCat = ocean_proximity
colNum = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)


[longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household]

In [23]:
renamedHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
only showing top 1 row



## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [24]:
// TODO: Replace <FILL IN> with appropriate code

for (c <- colNum) {
    val count_null_column_elements = renamedHousing.filter(s"${c} is null").count()
    println(s"Number of missing values in column ${c}: ${count_null_column_elements}")
}

Number of missing values in column longitude: 0
Number of missing values in column latitude: 0
Number of missing values in column housing_median_age: 0
Number of missing values in column total_rooms: 0
Number of missing values in column total_bedrooms: 207
Number of missing values in column population: 0
Number of missing values in column households: 0
Number of missing values in column median_income: 0
Number of missing values in column rooms_per_household: 0
Number of missing values in column bedrooms_per_room: 207
Number of missing values in column population_per_household: 0


As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [25]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("median")
    .setInputCols(colNum)
    .setOutputCols(colNum)

val imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)

imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



imputer = imputer_e61c2a16b993
imputedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

In [26]:
imputedHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
only showing top 1 row



### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vector.

In [27]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val input_and_output_columns = imputedHousing.columns.filter(_ != "ocean_proximity")

val va = new VectorAssembler()
    .setInputCols(input_and_output_columns)
    .setOutputCol("features")

val featuredHousing = va.transform(imputedHousing)

val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")

val scaledHousing = scaler.fit(featuredHousing).transform(featuredHousing).drop("features")

scaledHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|      scaledFeatures|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-61.007269596069...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.2381370826010

input_and_output_columns = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, label, rooms_per_household, bedrooms_per_room, population_per_household)
va = vecAssembler_1984cde5a53e
featuredHousing = [longitude: double, latitude: double ... 12 more fields]
scaler = stdScal_4e70257c2438
scaledHousing = [longitude: double, latitude: double ... 12 more fields]


[longitude: double, latitude: double ... 12 more fields]

## 3.2. Prepare categorical attributes
After imputing and scaling the continuse attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

The instructions say 'print the number' but there is a show statemets, so I am both printing the names of the distinct categories for ```ocean_proximity``` and their number. 

In [28]:
// TODO: Replace <FILL IN> with appropriate code

renamedHousing.select("ocean_proximity").distinct().show

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



In [29]:
renamedHousing.select("ocean_proximity").distinct().count()

5

### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [30]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
    .setInputCol("ocean_proximity")
    .setOutputCol("ocean_proximity_indexed")

val idxHousing = indexer.fit(renamedHousing).transform(renamedHousing)

idxHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_indexed|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.2

indexer = strIdx_4a78885e2e15
idxHousing = [longitude: double, latitude: double ... 12 more fields]


[longitude: double, latitude: double ... 12 more fields]

Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [31]:
indexer.fit(renamedHousing).labels

[<1H OCEAN, INLAND, NEAR OCEAN, NEAR BAY, ISLAND]

### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [32]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encoder = new OneHotEncoderEstimator()
    .setInputCols(Array("ocean_proximity_indexed"))
    .setOutputCols(Array("ocean_proximity_one_hot"))

val ohHousing = encoder.fit(idxHousing).transform(idxHousing)

ohHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_indexed|ocean_proximity_one_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|          (4,[3],[1.0])|
|  -122.22|   37.86|              21

encoder = oneHotEncoder_8e72cd94ecf3
ohHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

Here we are checking the distinct values generated by the one-hot encoder:

In [33]:
ohHousing.select("ocean_proximity", "ocean_proximity_one_hot").distinct().collect()

0,1
ISLAND,"(4,[],[])"
INLAND,"(4,[1],[1.0])"
NEAR BAY,"(4,[3],[1.0])"
<1H OCEAN,"(4,[0],[1.0])"
NEAR OCEAN,"(4,[2],[1.0])"


---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [34]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

val numPipeline = new Pipeline().setStages(Array(imputer, va, scaler))
val catPipeline = new Pipeline().setStages(Array(indexer, encoder))
val pipeline = new Pipeline().setStages(Array(numPipeline, catPipeline))
val newHousing = pipeline.fit(renamedHousing).transform(renamedHousing).drop("features_temp", "features", "ocean_proximity_indexed")

newHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|      scaledFeatures|ocean_proximity_one_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-61.007269596069...|          (4,[3],[1.0])|
|  -122.22|   37.86|              21.0|     7099

numPipeline = pipeline_238d8bd35a3b
catPipeline = pipeline_30cd89f8ca74
pipeline = pipeline_707101c6ea43
newHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [35]:
// TODO: Replace <FILL IN> with appropriate code

val va2 = new VectorAssembler()
    .setInputCols(newHousing.columns.filter(_ != "ocean_proximity"))
    .setOutputCol("features")

val dataset = va2.transform(newHousing).select("features", "label")

dataset.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-122.23,37.88,41...|452600.0|
|[-122.22,37.86,21...|358500.0|
|[-122.24,37.85,52...|352100.0|
|[-122.25,37.85,52...|341300.0|
|[-122.25,37.85,52...|342200.0|
+--------------------+--------+
only showing top 5 rows



va2 = vecAssembler_cf898d55c56a
dataset = [features: vector, label: double]


[features: vector, label: double]

---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [36]:
// TODO: Replace <FILL IN> with appropriate code

val Array(trainSet, testSet) = dataset.randomSplit(Array[Double](0.8, 0.2))

trainSet = [features: vector, label: double]
testSet = [features: vector, label: double]


[features: vector, label: double]

## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [37]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.LinearRegression

// train the model
val lr = new LinearRegression()
    .setLabelCol("label")
    .setFeaturesCol("features")

val lrModel = lr.fit(trainSet)
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients}, Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [-1.249743355461832,-1.3269477843751938,-0.009665255327551771,-0.001518141440921788,-0.002710136559370238,-0.0013077026736120117,0.015067445154838465,-0.0025506404093194085,0.49999778199791384,0.25191472565423484,-12.716264807942258,0.018251419492755373,-2.503903779372859,-2.83429482335083,-0.12164241450197057,-3.312000457734323,-1.136609308876991,-1.4809237378010078,5.760732609804465,-0.0048456537326970195,57697.55199031857,0.6232805796225882,-0.7337237611234485,0.18956013950453626,-100.37866289978578,-96.83876919881746,-101.45032751640316,-100.66664182027675], Intercept: -101.25438740564587
RMSE: 3.3681372226723925


lr = linReg_6e042fdf5e41
lrModel = linReg_6e042fdf5e41
trainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@5f2e5e63


org.apache.spark.ml.regression.LinearRegressionTrainingSummary@5f2e5e63

Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [38]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)

predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 94597.26683764346| 94600.0|[-124.35,40.54,52...|
|106697.12464556242|106700.0|[-124.23,40.54,52...|
| 66894.45370113969| 66900.0|[-124.21,41.75,20...|
|  70496.6819617766| 70500.0|[-124.18,40.79,39...|
|128896.90158364514|128900.0|[-124.17,40.74,17...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 2.7681879164915646


predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_ae40acdccb27
rmse = 2.7681879164915646


2.7681879164915646

## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [39]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val dt = new DecisionTreeRegressor()
    .setLabelCol("label")
    .setFeaturesCol("features")

// train the model
val dtModel = dt.fit(trainSet)

// make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|  95460.0201409869| 94600.0|[-124.35,40.54,52...|
|110895.85389930899|106700.0|[-124.23,40.54,52...|
| 69415.45105566218| 66900.0|[-124.21,41.75,20...|
| 69415.45105566218| 70500.0|[-124.18,40.79,39...|
|132096.33911368015|128900.0|[-124.17,40.74,17...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5971.694645288582


dt = dtr_5bdf3626a1dc
dtModel = DecisionTreeRegressionModel (uid=dtr_5bdf3626a1dc) of depth 5 with 61 nodes
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_2090e68e6d9f
rmse = 5971.694645288582


5971.694645288582

## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [40]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val rf = new RandomForestRegressor()
    .setLabelCol("label")
    .setFeaturesCol("features")

// train the model
val rfModel = rf.fit(trainSet)

// make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|116351.05301960585| 94600.0|[-124.35,40.54,52...|
|123158.11669190794|106700.0|[-124.23,40.54,52...|
| 91146.31022742653| 66900.0|[-124.21,41.75,20...|
| 93586.31677437304| 70500.0|[-124.18,40.79,39...|
|141699.73747854584|128900.0|[-124.17,40.74,17...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 11865.82249308016


rf = rfr_33132bf5316c
rfModel = RandomForestRegressionModel (uid=rfr_33132bf5316c) with 20 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_e981c3b2ed97
rmse = 11865.82249308016


11865.82249308016

## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [41]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val gb = new GBTRegressor()
    .setLabelCol("label")
    .setFeaturesCol("features")

// train the model
val gbModel = gb.fit(trainSet)

// make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 92245.07076123741| 94600.0|[-124.35,40.54,52...|
|108658.11846530528|106700.0|[-124.23,40.54,52...|
| 65497.06191691438| 66900.0|[-124.21,41.75,20...|
| 74558.47090946979| 70500.0|[-124.18,40.79,39...|
|123647.29284970279|128900.0|[-124.17,40.74,17...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5400.582708093331


gb = gbtr_460d496ad626
gbModel = GBTRegressionModel (uid=gbtr_460d496ad626) with 20 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_64c32dd06ee8
rmse = 5400.582708093331


5400.582708093331

---
# 6. Hyperparameter tuning
An important task in Machine Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [42]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator

val paramGrid = new ParamGridBuilder()
  .addGrid((rfModel).numTrees, Array(1, 5, 10))
  .addGrid(rfModel.maxDepth, Array(5, 10, 15))
  .build()

val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val cv = new CrossValidator()
    .setEstimator(rf)
    .setEvaluator(new RegressionEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)

val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 88553.95256916997| 94600.0|[-124.35,40.54,52...|
|111359.74534769834|106700.0|[-124.23,40.54,52...|
| 64504.02298850575| 66900.0|[-124.21,41.75,20...|
| 74291.66666666667| 70500.0|[-124.18,40.79,39...|
| 136732.0342205323|128900.0|[-124.17,40.74,17...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5916.1204091439395


paramGrid = 


Array({
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 10
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 10
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_3313...


[{
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 5,
	rfr_33132bf5316c-numTrees: 10
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 10,
	rfr_33132bf5316c-numTrees: 10
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_33132bf5316c-numTrees: 1
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_33132bf5316c-numTrees: 5
}, {
	rfr_33132bf5316c-maxDepth: 15,
	rfr_33132bf5316c-numTrees: 10
}]

---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of ouput column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. The details of the implemeting a custom Tranfomer is explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before starting to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [43]:
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

trait MyParams extends Params {
    final val inputCol1 = new Param[String](this, "inputCol1", "The first input column")
    final val inputCol2 = new Param[String](this, "inputCol2", "The second input column")
    final val outputCol = new Param[String](this, "outputCol", "The output column")
    
  protected def validateAndTransformSchema(schema: StructType): StructType = {
    val idx1 = schema.fieldIndex("inputCol1")
    val idx2 = schema.fieldIndex("inputCol1")

    val field1 = schema.fields(idx1)
    val field2 = schema.fields(idx2)

    if (field1.dataType != DoubleType) {
      throw new Exception(s"Input type ${field1.dataType} did not match input type DoubleType")
    }
    if (field2.dataType != DoubleType) {
      throw new Exception(s"Input type ${field2.dataType} did not match input type DoubleType")
    }
    // Add the return field
    schema.add(StructField("outputCol", DoubleType, false))  
  }
}


defined trait MyParams


Then, extend the class `Transformer`, and implement its setter functions for the input and output columns, and call then `setInputCol1`, `setInputCol2`, and `setOutputCol`. Morever, you need to override the methods `copy`, `transformSchema`, and the `transform`. The details of what you need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [44]:
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, udf}

class MyTransformer(override val uid: String) extends Transformer with MyParams {
    def this() = this(Identifiable.randomUID("configurablewordcount"))
    
    def setInputCol1(value: String) = set(inputCol1, value)
    
    def setInputCol2(value: String) = set(inputCol2, value)
    
    def setOutputCol(value: String) = set(outputCol, value)

    override def copy(extra: ParamMap): MyTransformer = {
        defaultCopy(extra)
    }

    override def transformSchema(schema: StructType): StructType = {
        validateAndTransformSchema(schema)
    }
    
    override def transform(dataset: Dataset[_]): DataFrame = {
        dataset.select(col("*"), (dataset.col($(inputCol1)) / dataset.col($(inputCol2))).as($(outputCol)))
  }
}

defined class MyTransformer


Now, an instance of `MyTransformer`, and set the input columns `total_rooms` and `households`, and the output column `rooms_per_household` and run it over the `housing` dataset.

In [45]:
val myTransformer = new MyTransformer().setInputCol1("total_rooms").setInputCol2("households").setOutputCol("rooms_per_household")

val myDataset = myTransformer.transform(housing).select("rooms_per_household").show(5)

+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows



myTransformer = configurablewordcount_90600747b632


myDataset: Unit = ()


configurablewordcount_90600747b632

---
# 8. Custom estimator (predictor)
Now, it's time to implement your own linear regression with gradient descent algorithm as a brand new Estimator. The whole code of the Estimator is given to you, and you do not need to implement anything. It is just a sample that shows how to build a custom Estimator.

The gradient descent update for linear regression is:
$$
w_{i+1} = w_{i} - \alpha_{i} \sum\limits_{j=1}^n (w_i^\top x_j - y_j)x_j
$$

where $i$ is the iteration number of the gradient descent algorithm, and $j$ identifies the observation. Here, $w$ represents an array of weights that is the same size as the array of features and provides a weight for each of the features when finally computing the label prediction in the form:

$$
prediction = w^\top \cdot\ x
$$

where $w$ is the final array of weights computed by the gradient descent, $x$ is the array of features of the observation point and $prediction$ is the label we predict should be associated to this observation.

The given `Helper` class implements the helper methods:
* `dot`: implements the dot product of two vectors and the dot product of a vector and a scalar
* `sum`: implements addition of two vectors
* `fill`: creates a vector of predefined size and initialize it with the predefined value

What you need to do is to implement the methods of the Linear Regresstion class `LR`, which are
* `rmsd`: computes the Root Mean Square Error of a given RDD of tuples of (label, prediction) using the formula:
$$
rmse = \sqrt{\frac{\sum\limits_{i=1}^n (label - prediction)^2}{n}}
$$
* `gradientSummand`: computes the following formula:
$$
gs_{ij} = (w_i^\top x_j - y_j)x_j
$$
* `gradient`: computes the following formula:
$$
gradient = \sum\limits_{j=1}^n gs_{ij}
$$

In [46]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

case class Instance(label: Double, features: Vector)

object Helper extends Serializable {
  def dot(v1: Vector, v2: Vector): Double = {
    val m = Matrices.dense(1, v1.size, v1.toArray)
    m.multiply(v2).values(0)
  }

  def dot(v: Vector, s: Double): Vector = {
    val baseArray = v.toArray.map(vi => vi * s)
    Vectors.dense(baseArray)
  }

  def sumVectors(v1: Vector, v2: Vector): Vector = {
    val baseArray = ((v1.toArray) zip (v2.toArray)).map { case (val1, val2) => val1 + val2 }
    Vectors.dense(baseArray)
  }

  def fillVector(size: Int, fillVal: Double): Vector = Vectors.dense(Array.fill[Double](size)(fillVal));
}

defined class Instance
defined object Helper


In [47]:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vectors

class LR() extends Serializable {
  def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
    val regressionMetrics = new RegressionMetrics(labelsAndPreds)
    regressionMetrics.rootMeanSquaredError
  }
  
  def gradientSummand(weights: Vector, lp: Instance): Vector = {
    val mult = (Helper.dot(weights, lp.features) - lp.label)
    val seq = (0 to lp.features.size - 1).map(i => lp.features(i) * mult)
    return Vectors.dense(seq.toArray)
  }
  
  def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
    val n = trainData.count()
    val d = trainData.take(1)(0).features.size
    var w = Helper.fillVector(d, 0)
    val alpha = 1.0
    val errorTrain = Array.fill[Double](numIters)(0.0)

    for (i <- 0 until numIters) {
      val labelsAndPredsTrain = trainData.map(lp => (lp.label, Helper.dot(w, lp.features)))
      errorTrain(i) = calcRMSE(labelsAndPredsTrain)

      val gradient = trainData.map(lp => gradientSummand(w, lp)).reduce((v1, v2) => Helper.sumVectors(v1, v2))
      val alpha_i = alpha / (n * scala.math.sqrt(i + 1))
      val wAux = Helper.dot(gradient, (-1) * alpha_i)
      w = Helper.sumVectors(w, wAux)
    }
    (w, errorTrain)
  }
}

defined class LR


In [48]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

abstract class MyLinearModel[FeaturesType, Model <: MyLinearModel[FeaturesType, Model]]
  extends PredictionModel[FeaturesType, Model] {
}

class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
    extends MyLinearModel[Vector, MyLinearModelImpl] {

  override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)

  def predict(features: Vector): Double = {
    println("Predicting")
    val prediction = Helper.dot(weights, features)
    prediction
  }
}

defined class MyLinearModel
defined class MyLinearModelImpl


In [49]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

abstract class MyLinearRegression[
    FeaturesType,
    Learner <: MyLinearRegression[FeaturesType, Learner, Model],
    Model <: MyLinearModel[FeaturesType, Model]]
  extends Predictor[FeaturesType, Learner, Model] {
}

class MyLinearRegressionImpl(override val uid: String)
    extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
  def this() = this(Identifiable.randomUID("linReg"))

  override def copy(extra: ParamMap): MyLinearRegressionImpl = defaultCopy(extra)
  
  def train(dataset: Dataset[_]): MyLinearModelImpl = {
    println("Training")

    val numIters = 10

    val instances: RDD[Instance] = dataset.select(
      col($(labelCol)), col($(featuresCol))).rdd.map {
        case Row(label: Double, features: Vector) =>
          Instance(label, features)
      }

    val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)

    new MyLinearModelImpl(uid, weights, trainingError)
  }
}

defined class MyLinearRegression
defined class MyLinearRegressionImpl


In [50]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val lr = new MyLinearRegressionImpl().setLabelCol("label").setFeaturesCol("features")
val model = lr.fit(trainSet)
val predictions = model.transform(trainSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

Training
+--------------------+--------+--------------------+
|          prediction|   label|            features|
+--------------------+--------+--------------------+
|-1.39163547060782...| 85800.0|[-124.3,41.8,19.0...|
|-1.68021609871910...|103600.0|[-124.3,41.84,17....|
|-1.28132559431017...| 79000.0|[-124.27,40.69,36...|
|-1.80656307224067...|111400.0|[-124.26,40.58,52...|
|-1.23408002382590...| 76100.0|[-124.25,40.28,32...|
+--------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 3.840387582720435E109


lr = linReg_195b2de90638
model = linReg_195b2de90638
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_11e974d9701d
rmse = 3.840387582720435E109


3.840387582720435E109

---
# 9. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

#### Load data and describe

In [51]:
val credit_card_defaults = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("data/ccdefault.csv")

credit_card_defaults = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

Describing the data:

In [52]:
credit_card_defaults.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

Confirm that we have 30000 data available as the description states:

In [53]:
credit_card_defaults.count()

30000

Show the 10 first elements:

In [54]:
credit_card_defaults.show(10)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

Compare the size fo the 2 possible classes:

In [55]:
credit_card_defaults.filter($"DEFAULT" === 0).count()

23364

In [56]:
credit_card_defaults.filter($"DEFAULT" === 1).count()

6636

As we can see, the classes are unbalanced, with $\frac{23364}{30000} = 77.88\%$  of the elements belonging to class 0, and the rest of them $(22.12\%)$ belonging to class 1.

#### Analysis based on gender

Find the behavior fo different sexes:

In [124]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"SEX" === "1").count()

9015

In [125]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"SEX" === "1").count()

2873

$\frac{9015}{9015 + 2873} = \frac{9015}{11888} = 75.83\%$ of men belong in class 0.  

In [126]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"SEX" === "2").count()

14349

In [127]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"SEX" === "2").count()

3763

$\frac{14349}{14349 + 14349} = \frac{14349}{18202} = 78.88\%$ of women belong in class 0.  

Now let's check for missing (null) elements:

#### Analysis based on education

In [131]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"EDUCATION" === "1").count()

8549

In [132]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"EDUCATION" === "1").count()

2036

In [133]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"EDUCATION" === "2").count()

10700

In [135]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"EDUCATION" === "2").count()

3330

In [136]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"EDUCATION" === "3").count()

3680

In [137]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"EDUCATION" === "3").count()

1237

In [138]:
credit_card_defaults.filter($"DEFAULT" === 0 && $"EDUCATION" === "4").count()

116

In [139]:
credit_card_defaults.filter($"DEFAULT" === 1 && $"EDUCATION" === "4").count()

7

As the education level grows higher, it is more probable for someone to be in time with her default payment. The percentage grows from approxinatelly 25% fir education level 1 to 33% for levels 2 and 3.

In [None]:
#### Distribution of

In [57]:
for (c <- credit_card_defaults.columns) {
    val count_null_column_elements = credit_card_defaults.filter(s"${c} is null").count()
    println(s"Number of missing values in column ${c}: ${count_null_column_elements}")
}

Number of missing values in column ID: 0
Number of missing values in column LIMIT_BAL: 0
Number of missing values in column SEX: 0
Number of missing values in column EDUCATION: 0
Number of missing values in column MARRIAGE: 0
Number of missing values in column AGE: 0
Number of missing values in column PAY_0: 0
Number of missing values in column PAY_2: 0
Number of missing values in column PAY_3: 0
Number of missing values in column PAY_4: 0
Number of missing values in column PAY_5: 0
Number of missing values in column PAY_6: 0
Number of missing values in column BILL_AMT1: 0
Number of missing values in column BILL_AMT2: 0
Number of missing values in column BILL_AMT3: 0
Number of missing values in column BILL_AMT4: 0
Number of missing values in column BILL_AMT5: 0
Number of missing values in column BILL_AMT6: 0
Number of missing values in column PAY_AMT1: 0
Number of missing values in column PAY_AMT2: 0
Number of missing values in column PAY_AMT3: 0
Number of missing values in column PAY_

We have full info about all elements!

#### Setting up the data with appropriate transformations before feeding them to a model:

As a first step, we need to separate the numerical with the categorical attributes, and preprocess each category appropriatelly:

In [58]:
val categorical_attributes = Array("SEX", "EDUCATION", "MARRIAGE", "AGE", "PAY_0", "PAY_2", "PAY_3", "PAY_4", "PAY_5", "PAY_6")

val numerical_attributes = credit_card_defaults.columns.filter(elem => (categorical_attributes.contains(elem) == false && elem != "DEFAULT" && elem!= "ID"))

categorical_attributes = Array(SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)
numerical_attributes = Array(LIMIT_BAL, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


[LIMIT_BAL, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6]

In [59]:
val input_and_output_columns = credit_card_defaults.columns.filter(elem => numerical_attributes.contains(elem) == true)

input_and_output_columns = Array(LIMIT_BAL, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


[LIMIT_BAL, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6]

#### Applying transformations to the numerical variables

In [64]:
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va_default = new VectorAssembler()
    .setInputCols(input_and_output_columns)
    .setOutputCol("features")

val scaler_default = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")

val numPipeline_default = new Pipeline().setStages(Array(va_default, scaler_default))

va_default = vecAssembler_77cf600bc9ef
scaler_default = stdScal_99e3d3592bc5
numPipeline_default = pipeline_7b9a87137bef


lastException: Throwable = null


pipeline_7b9a87137bef

#### Applying transformations to the categorical variables

In [69]:
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.Pipeline

val indexers_default = categorical_attributes.map(s => new StringIndexer().setInputCol(s).setOutputCol(s+"_temp"))

val one_hot_encoder_default = new OneHotEncoderEstimator().
    setInputCols(categorical_attributes.map(s => s+"_temp")).
    setOutputCols(categorical_attributes.map(s => s+"_one_hot"))

val stages_cat = indexers_default ++ Array(one_hot_encoder_default)

val catPipeline_default = new Pipeline().setStages(stages_cat)

indexers_default = Array(strIdx_744029679e9f, strIdx_93ac8b35daf2, strIdx_622ab123c291, strIdx_5fe03ad20aa0, strIdx_8ae402a15348, strIdx_11a465147d6d, strIdx_59fc6dc647aa, strIdx_3d94a5011c7e, strIdx_6ae598afae9e, strIdx_ff5be09a0262)
one_hot_encoder_default = oneHotEncoder_940e7c025bb2


lastException: Throwable = null
stages_cat: Array[org.apache.spark.ml.Estimator[_ >: org.apache.spark.ml.feature.StringIndexerModel with org.apache.spark.ml.feature.OneHotEncoderModel <: org.apache.spark.ml.Model[_ >: org.apache.spark.ml.feature.StringIndexerModel with org.apache.spark.ml.featur...


oneHotEncoder_940e7c025bb2

In [71]:
val pipeline_default = new Pipeline().setStages(Array(numPipeline_default, catPipeline_default))
val scaledDefaults = pipeline_default.fit(credit_card_defaults).transform(credit_card_defaults)

pipeline_default = pipeline_85901effff5f
scaledDefaults = [ID: int, LIMIT_BAL: int ... 45 more fields]


[ID: int, LIMIT_BAL: int ... 45 more fields]

In [72]:
scaledDefaults.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

In [74]:
scaledDefaults.show(10)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+--------------------+--------+--------------+-------------+--------+----------+----------+----------+----------+----------+----------+--------------+----------------+--------------+--------------+-------------+-------------+-----------------+---------------+-------------+--------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|            features|      scaledFeatures|SEX_temp|EDUCATION_temp|MARRIAGE_temp|AGE_temp|PAY_0_temp|PAY_2_temp|PAY_3_temp|PAY_4_temp|PAY_5_temp|PAY_6_temp| PAY_3_one_hot|MARRIAGE_one_hot| PAY_0_one_hot| PAY_4_one_hot|PAY_5_one_hot|  SEX_one_hot|EDUCATION_one_hot|    AGE_one_hot|PAY_6_one_ho

In [75]:
val kept_elements = Array("scaledFeatures", "PAY_0_one_hot", "PAY_4_one_hot", "PAY_5_one_hot", "SEX_one_hot", "EDUCATION_one_hot", "AGE_one_hot", "PAY_6_one_hot", "PAY_2_one_hot")

val va2_default = new VectorAssembler()
    .setInputCols(kept_elements)
    .setOutputCol("features")

val default_dataset = scaledDefaults.select("features", "DEFAULT").withColumnRenamed("DEFAULT", "label")

val Array(trainSet, testSet) = default_dataset.randomSplit(Array[Double](0.8, 0.2))

kept_elements = Array(scaledFeatures, PAY_0_one_hot, PAY_4_one_hot, PAY_5_one_hot, SEX_one_hot, EDUCATION_one_hot, AGE_one_hot, PAY_6_one_hot, PAY_2_one_hot)
va2_default = vecAssembler_7f9698f9ae9e
default_dataset = [features: vector, label: int]
trainSet = [features: vector, label: int]
testSet = [features: vector, label: int]


[features: vector, label: int]

#### Define ROC Evaluator

In [99]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val ROC_Evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setMetricName("areaUnderROC")

ROC_Evaluator = binEval_8820c91d5d94


binEval_8820c91d5d94

### Logistic Regression

In [93]:
import org.apache.spark.ml.classification.LogisticRegression

val logistic_regression = 
            new LogisticRegression().
            setMaxIter(10).
            setRegParam(0.3).
            setElasticNetParam(0.8).
            setFeaturesCol("features").
            setLabelCol("label")

logistic_regression = logreg_3c640dc22320


logreg_3c640dc22320

In [94]:
val logistic_regression_model = logistic_regression.fit(trainSet)
val logistic_regression_predictions = logistic_regression_model.transform(testSet)

logistic_regression_model = logreg_9887da0b5393
logistic_regression_predictions = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

#### Evaluate on the test set

In [95]:
ROC_Evaluator.evaluate(logistic_regression_predictions)

0.5

### Decision trees

In [96]:
import org.apache.spark.ml.classification.DecisionTreeClassifier

val dt_classifier = new DecisionTreeClassifier().
                    setImpurity("gini").
                    setMaxDepth(2).
                    setLabelCol("label").
                    setFeaturesCol("features")

dt_classifier = dtc_c39c0e401178


dtc_c39c0e401178

In [97]:
val decision_tree_model = dt_classifier.fit(trainSet)
val decision_tree_predictions = decision_tree_model.transform(testSet)

decision_tree_model = DecisionTreeClassificationModel (uid=dtc_c39c0e401178) of depth 2 with 7 nodes
decision_tree_predictions = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

#### Evaluate

In [101]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val ROC_Evaluator_dt = new BinaryClassificationEvaluator().setLabelCol("label").setMetricName("areaUnderROC")

ROC_Evaluator_dt = binEval_1328bd076889


binEval_1328bd076889

In [102]:
ROC_Evaluator_dt.evaluate(decision_tree_predictions)

0.49853451187389003

### Random Forests

In [111]:
import org.apache.spark.ml.classification.RandomForestClassifier

val rf_classifier = new RandomForestClassifier().
                    setImpurity("gini").
                    setMaxDepth(3).
                    setNumTrees(20).
                    setFeatureSubsetStrategy("auto").
                    setLabelCol("label").
                    setFeaturesCol("features")

rf_classifier = rfc_4f10212ec276


lastException: Throwable = null


rfc_4f10212ec276

In [112]:
val rf_model = rf_classifier.fit(trainSet)
val rf_predictions = rf_model.transform(testSet)

rf_model = RandomForestClassificationModel (uid=rfc_4f10212ec276) with 20 trees
rf_predictions = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

In [115]:
val rf_evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setMetricName("areaUnderROC")

rf_evaluator = binEval_dcd0ede80286


lastException: Throwable = null


binEval_dcd0ede80286

In [116]:
rf_evaluator.evaluate(rf_predictions)

0.7047052260718195

#### Try with a bigger depth:

In [117]:
import org.apache.spark.ml.classification.RandomForestClassifier

val rf_classifier_2 = new RandomForestClassifier().
                    setImpurity("gini").
                    setMaxDepth(4).
                    setNumTrees(30).
                    setFeatureSubsetStrategy("auto").
                    setLabelCol("DEFAULT").
                    setFeaturesCol("scaledFeatures")

rf_classifier_2 = rfc_3db5f9c1bb9c


rfc_3db5f9c1bb9c

In [118]:
val rf_model_2 = rf_classifier.fit(trainSet)
val rf_predictions_2 = rf_model.transform(testSet)

rf_model_2 = RandomForestClassificationModel (uid=rfc_4f10212ec276) with 20 trees
rf_predictions_2 = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

In [119]:
rf_evaluator.evaluate(rf_predictions_2)

0.7047052260718195

Same result!

## Evaluation and Discussion

By judging the 3 classifiers based on their area under ROC performance, we observe that ther Random Forest classifier performs better in overal terms, and the Logistic Regression model has more or less the same performance with the Decision Tree Classifier.

We expected such a behavior in terms of comparison with the Decision Tree Classifier, since Random Forests employ several Decision Trees based on the assumption that by taking the majority vote of several week learners they can predict with a better accuracy the final result. Random Forests also deal with the overfitting problem of single decision trees, and they are an ensemble of classifiers which in general combine several low bias-high variance models in order to make their final prediction.

Commenting on the Logistic Regression, it is in general a model that comes handful when we are dealing with sparse data, and due to its linear approach it is more difficult for it to overfit on the data. However, when we deal with problems with a small amount of data and sparsity (or noisy data) is noty an issue, it is generally expected to be outperformed by the Random Forest Classifier.

We can try several methods to either boost up the performance of our individual classifiers or combine them:

i) K-fold cross validation can be applied to each individual classifier in order to improve its performance.  
ii) We can also use an ensemble of classifiers, by combining our three classifiers and taking a majority vote, since we have an odd number of classifiers.  
iii) Through Adaptive Boost we can iteratively generate new classifiers that will focus on the misclassifications of the rpevious classifiers.    
iv) We can perform extra tuning on the hyperparameters of each classifier and find out their optimal settings given this dataset.  
v) Through extra exploitation of the input data, and performing methods such as Principal Component Analysis we may be able to further improve the classifier's performance.  
vi) Adding more ()data can also help us improve the performance of the model