# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---

In [2]:
spark.version

res0: String = 2.4.4


### Import all relevant packages

In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.ml.util._

import org.apache.spark.ml.linalg.{Vector, Vectors, Matrix, Matrices}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoderEstimator}
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, LogisticRegression}
import org.apache.spark.ml.regression.{LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor}
import org.apache.spark.ml.evaluation.{RegressionEvaluator, BinaryClassificationEvaluator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.types.{StructType}
import org.apache.spark.sql.functions.{col, udf}

Intitializing Scala interpreter ...

Spark Web UI available at http://130.229.191.72:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1574686600672)
SparkSession available as 'spark'


import org.apache.spark.sql.functions._
import org.apache.spark.ml.util._
import org.apache.spark.ml.linalg.{Vector, Vectors, Matrix, Matrices}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoderEstimator}
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, RandomForestClassifier, LogisticRegression}
import org.apache.spark.ml.regression.{LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor}
import org.apache.spark.ml...

# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [3]:
val housing = spark.read.format("csv")
    .option("header", "true")
    .option("inferschema", "true")
    .load("data/housing.csv")

housing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 8 more fields]


---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [4]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [5]:
housing.count

res2: Long = 20640


## 2.2. Look at the data
Print the first five records of the dataset.

In [6]:
housing.show(5, false)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|-122.23  |37.88   |41.0              |880.0      |129.0         |322.0     |126.0     |8.3252       |452600.0          |NEAR BAY       |
|-122.22  |37.86   |21.0              |7099.0     |1106.0        |2401.0    |1138.0    |8.3014       |358500.0          |NEAR BAY       |
|-122.24  |37.85   |52.0              |1467.0     |190.0         |496.0     |177.0     |7.2574       |352100.0          |NEAR BAY       |
|-122.25  |37.85   |52.0              |1274.0     |235.0         |558.0     |219.0     |5.6431       |341300.0          |NEAR BAY       |
|-122.25  |37.85   |52.0          

Print the number of records with population more than 10000.

In [7]:
housing.filter($"population" > 10000).count

res4: Long = 23


## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [8]:
housing.describe("housing_median_age", "total_rooms", "median_house_value", "population").show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [9]:
housing.agg(max("housing_median_age")).show()
housing.agg(min("total_rooms")).show()
housing.agg(avg("median_house_value")).show()

+-----------------------+
|max(housing_median_age)|
+-----------------------+
|                   52.0|
+-----------------------+

+----------------+
|min(total_rooms)|
+----------------+
|             2.0|
+----------------+

+-----------------------+
|avg(median_house_value)|
+-----------------------+
|     206855.81690891474|
+-----------------------+



## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [10]:
housing.groupBy("ocean_proximity")
    .count()
    .sort(desc("count"))
    .show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [11]:
housing.groupBy("ocean_proximity")
    .agg(avg("median_house_value"))
    .withColumnRenamed("avg(median_house_value)",  "avg_value")
    .show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [12]:
housing.createOrReplaceTempView("df")
spark.sql("SELECT ocean_proximity, avg(median_house_value) AS avg_value FROM df GROUP BY ocean_proximity").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [13]:
val va = new VectorAssembler()
    .setInputCols(Array("housing_median_age", "total_rooms", "median_house_value", "population"))
    .setOutputCol("features")

val housingAttrs = va.transform(housing)

housingAttrs.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

va: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_a48a38ef29c6
housingAttrs: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]


In [14]:
val Row(coeff: Matrix) = Correlation.corr(housingAttrs, "features").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293   
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772    
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235  
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0                   


coeff: org.apache.spark.ml.linalg.Matrix =
1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0


## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [15]:
val housingCol1 = housing.withColumn("rooms_per_household", expr("total_rooms/households"))
val housingCol2 = housingCol1.withColumn("bedrooms_per_room", expr("total_bedrooms/total_rooms"))
val housingExtra = housingCol2.withColumn("population_per_household", expr("population/households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]
housingCol2: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 10 more fields]
housingExtra: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [16]:
val renamedHousing = housingExtra.withColumnRenamed("median_house_value", "label")

renamedHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, sice we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [17]:
// Label column
val colLabel = "label"

// Categorical columns
val colCat = "ocean_proximity"

// Numerical columns
val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel: String = label
colCat: String = ocean_proximity
colNum: Array[String] = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)


## 3.1. Prepare continuous attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [18]:
for (c <- colNum) {
    val missingCount = renamedHousing.filter(renamedHousing(c).isNull || 
            renamedHousing(c) === "" || renamedHousing(c).isNaN).count() 
    printf("Missing values for " + c + " : " + missingCount + "\n")
}

Missing values for longitude : 0
Missing values for latitude : 0
Missing values for housing_median_age : 0
Missing values for total_rooms : 0
Missing values for total_bedrooms : 207
Missing values for population : 0
Missing values for households : 0
Missing values for median_income : 0
Missing values for rooms_per_household : 0
Missing values for bedrooms_per_room : 207
Missing values for population_per_household : 0


As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [19]:
val imputer = new Imputer()
    .setInputCols(Array("total_bedrooms", "bedrooms_per_room"))
    .setOutputCols(Array("total_bedrooms", "bedrooms_per_room"))    
    .setStrategy("median")

val imputedHousing = imputer
    .fit(renamedHousing.select(colNum.head, colNum.tail: _*))
    .transform(renamedHousing.select(colNum.head, colNum.tail: _*))

imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



imputer: org.apache.spark.ml.feature.Imputer = imputer_2aa1739408dc
imputedHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]


In [20]:
// Sanity check
for (c <- imputedHousing.columns) {
    printf(c + " Missing values: ")
    val missingCount = imputedHousing.filter(imputedHousing(c).isNull || 
            imputedHousing(c) === "" || imputedHousing(c).isNaN).count() 
    println(missingCount)
}

longitude Missing values: 0
latitude Missing values: 0
housing_median_age Missing values: 0
total_rooms Missing values: 0
total_bedrooms Missing values: 0
population Missing values: 0
households Missing values: 0
median_income Missing values: 0
rooms_per_household Missing values: 0
bedrooms_per_room Missing values: 0
population_per_household Missing values: 0


### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attributes is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vector of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [21]:
val va = new VectorAssembler()
    .setInputCols(colNum)
    .setOutputCol("features")

val featuredHousing = va.transform(imputedHousing)

val scaler = new StandardScaler()
    .setInputCol("features")
    .setOutputCol("scaled")

val scaledHousing = scaler
    .fit(featuredHousing)
    .transform(featuredHousing)

scaledHousing.select("scaled").show(5, false)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaled                                                                                                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[-61.00726959606955,17.734477624640412,3.2577023016083064,0.40337085073160667,0.30758821710917267,0.2843362208866199,0.3295584480852433,4.382095394195227,2.8228125480951665,2.5405867237343416,0.24605655309533123]|
|[-61.002278409814444,17.725114120086744,1.668579227653035,3.2540109878905406,2.637151690873992,2.1201592122632746,2.9764882057222772,4.3695

va: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_db9e3cdf4f84
featuredHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 10 more fields]
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_8baab90f410a
scaledHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


## 3.2. Prepare categorical attributes
After imputing and scaling the continuous attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

In [22]:
renamedHousing.groupBy("ocean_proximity")
    .count()
    .sort(desc("count"))
    .show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [23]:
val indexer = new StringIndexer()
    .setInputCol("ocean_proximity")
    .setOutputCol("ocean_proximity_ints")

val idxHousing = indexer
    .fit(renamedHousing)
    .transform(renamedHousing)

idxHousing.select("ocean_proximity_ints")show(5, false)

+--------------------+
|ocean_proximity_ints|
+--------------------+
|3.0                 |
|3.0                 |
|3.0                 |
|3.0                 |
|3.0                 |
+--------------------+
only showing top 5 rows



indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_176b0dc2cc57
idxHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 12 more fields]


Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [24]:
indexer.fit(renamedHousing).labels

res19: Array[String] = Array(<1H OCEAN, INLAND, NEAR OCEAN, NEAR BAY, ISLAND)


### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [25]:
val encoder = new OneHotEncoderEstimator()
    .setInputCols(Array("ocean_proximity_ints"))
    .setOutputCols(Array("ocean_proximity_one_hot"))

val ohHousing = encoder.
    fit(idxHousing)
    .transform(idxHousing)

ohHousing.select("ocean_proximity", "ocean_proximity_ints", "ocean_proximity_one_hot").show(5)

+---------------+--------------------+-----------------------+
|ocean_proximity|ocean_proximity_ints|ocean_proximity_one_hot|
+---------------+--------------------+-----------------------+
|       NEAR BAY|                 3.0|          (4,[3],[1.0])|
|       NEAR BAY|                 3.0|          (4,[3],[1.0])|
|       NEAR BAY|                 3.0|          (4,[3],[1.0])|
|       NEAR BAY|                 3.0|          (4,[3],[1.0])|
|       NEAR BAY|                 3.0|          (4,[3],[1.0])|
+---------------+--------------------+-----------------------+
only showing top 5 rows



encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_2a89a544bf5c
ohHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 13 more fields]


---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [26]:
val numPipeline = new Pipeline()
    .setStages(Array(imputer, va, scaler))

val catPipeline = new Pipeline()
    .setStages(Array(indexer, encoder))

val pipeline = new Pipeline()
    .setStages(Array(numPipeline, catPipeline))

val newHousing = pipeline
    .fit(renamedHousing)
    .transform(renamedHousing)

newHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|            features|              scaled|ocean_proximity_ints|ocean_proximity_one_hot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909

numPipeline: org.apache.spark.ml.Pipeline = pipeline_a510eb4e800b
catPipeline: org.apache.spark.ml.Pipeline = pipeline_499686bccc5a
pipeline: org.apache.spark.ml.Pipeline = pipeline_6a792b1ea8da
newHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 15 more fields]


Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [27]:
val finalHousing = newHousing.drop("features")

val va2 = new VectorAssembler()
     .setInputCols(Array("scaled", "ocean_proximity_one_hot"))
    .setOutputCol("features")

val dataset = va2
    .transform(finalHousing)
    .select("features", "label")

dataset.show(1, false)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                                                                                                                                            |label   |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|[-61.00726959606955,17.734477624640412,3.2577023016083064,0.40337085073160667,0.30758821710917267,0.2843362208866199,0.3295584480852433,4.382095394195227,2.8228125480951665,2.5405867237343416,0.24605655309533123,0.0,0.0,0.0,1.0]|452600.0|
+---------------------------------------

finalHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 14 more fields]
va2: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_7a3a22563892
dataset: org.apache.spark.sql.DataFrame = [features: vector, label: double]


---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [28]:
val Array(trainSet, testSet) = dataset.randomSplit(Array(0.8, 0.2))

trainSet.show(3)
testSet.show(3)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-62.065401082150...| 94600.0|
|[-62.040445150874...| 85800.0|
|[-62.020480405854...|111400.0|
+--------------------+--------+
only showing top 3 rows

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-62.040445150874...|103600.0|
|[-62.025471592109...| 79000.0|
|[-62.005506847089...|106700.0|
+--------------------+--------+
only showing top 3 rows



trainSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]


## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [29]:
// Train the model
val lr = new LinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("label")
    .setSolver("normal")
    .setMaxIter(10)

val lrModel = lr.fit(trainSet)

val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [-54456.50752335199,-54645.695174988425,14019.701483467456,7785.422929937331,951.870875408345,-45951.7936531568,42995.40213305872,78308.04124420747,6825.410797460786,16778.07279655265,745.5910782686559,-177199.48660577965,-212530.29078463395,-171360.93838384398,-181560.7310198749] Intercept: -2220759.9126649867
RMSE: 67703.22767684447


lr: org.apache.spark.ml.regression.LinearRegression = linReg_0e7f5cc9d457
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = linReg_0e7f5cc9d457
trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@6aa44421


Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [30]:
// Make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setPredictionCol("prediction")
    .setLabelCol("label")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|145474.21404122608|103600.0|[-62.040445150874...|
|181358.86126578646| 79000.0|[-62.025471592109...|
| 211206.8264460913|106700.0|[-62.005506847089...|
|194893.46144828526| 90100.0|[-61.985542102068...|
| 147764.6627887222| 70000.0|[-61.985542102068...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 68668.7637620722


predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_f742a8166585
rmse: Double = 68668.7637620722


## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [31]:
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("features")

// Train the model
val dtModel = dt.fit(trainSet)

// Make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setPredictionCol("prediction")
    .setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 172707.9489051095|103600.0|[-62.040445150874...|
|143072.61713173264| 79000.0|[-62.025471592109...|
| 172707.9489051095|106700.0|[-62.005506847089...|
|196663.00598476606| 90100.0|[-61.985542102068...|
|153808.83534136545| 70000.0|[-61.985542102068...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 68379.42891496226


dt: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_e2fb403b9b88
dtModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel (uid=dtr_e2fb403b9b88) of depth 5 with 63 nodes
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_80be96b2e4fa
rmse: Double = 68379.42891496226


## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [32]:
val rf = new RandomForestRegressor()
    .setMaxDepth(10)
    .setNumTrees(25)
    .setLabelCol("label")
    .setFeaturesCol("features")
    .setFeatureSubsetStrategy("auto")

// Train the model
val rfModel = rf.fit(trainSet)

// Make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setPredictionCol("prediction")
    .setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|126767.46846147433|103600.0|[-62.040445150874...|
| 97010.35444230502| 79000.0|[-62.025471592109...|
|132536.23018878163|106700.0|[-62.005506847089...|
|185218.23203295693| 90100.0|[-61.985542102068...|
|  88185.1643612136| 70000.0|[-61.985542102068...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 54003.24561722442


rf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_143f81a4d873
rfModel: org.apache.spark.ml.regression.RandomForestRegressionModel = RandomForestRegressionModel (uid=rfr_143f81a4d873) with 25 trees
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_e0f026d576c9
rmse: Double = 54003.24561722442


## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [33]:
val gb = new GBTRegressor()
    .setLabelCol("label")
    .setFeaturesCol("features")
    .setMaxIter(10)

// Train the model
val gbModel = gb.fit(trainSet)

// Make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setPredictionCol("prediction")
    .setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 128283.8531435173|103600.0|[-62.040445150874...|
| 99700.62360043684| 79000.0|[-62.025471592109...|
| 128283.8531435173|106700.0|[-62.005506847089...|
|121588.88118708531| 90100.0|[-61.985542102068...|
| 96931.13282731757| 70000.0|[-61.985542102068...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 60423.81688813691


gb: org.apache.spark.ml.regression.GBTRegressor = gbtr_6031fe610ab4
gbModel: org.apache.spark.ml.regression.GBTRegressionModel = GBTRegressionModel (uid=gbtr_6031fe610ab4) with 10 trees
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_2a6e27ab523b
rmse: Double = 60423.81688813691


---
# 6. Hyperparameter tuning
An important task in Machine Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [34]:
val paramGrid = new ParamGridBuilder()
    .addGrid(rfModel.maxDepth, Array(1, 5, 10))
    .addGrid(rfModel.numTrees, Array(5, 10, 15))
    .build()

val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

// It seems like we have to create a pipeline for setEstimator to work
val pipeline = new Pipeline().setStages(Array(rfModel)) 

val cv = new CrossValidator()
    .setEstimator(pipeline)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(3)
val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 211279.1141024572|103600.0|[-62.040445150874...|
|161683.92407050836| 79000.0|[-62.025471592109...|
|220893.71698130268|106700.0|[-62.005506847089...|
| 308697.0533882615| 90100.0|[-61.985542102068...|
|  146975.273935356| 70000.0|[-61.985542102068...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 160076.7025258468


paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_143f81a4d873-maxDepth: 1,
	rfr_143f81a4d873-numTrees: 5
}, {
	rfr_143f81a4d873-maxDepth: 1,
	rfr_143f81a4d873-numTrees: 10
}, {
	rfr_143f81a4d873-maxDepth: 1,
	rfr_143f81a4d873-numTrees: 15
}, {
	rfr_143f81a4d873-maxDepth: 5,
	rfr_143f81a4d873-numTrees: 5
}, {
	rfr_143f81a4d873-maxDepth: 5,
	rfr_143f81a4d873-numTrees: 10
}, {
	rfr_143f81a4d873-maxDepth: 5,
	rfr_143f81a4d873-numTrees: 15
}, {
	rfr_143f81a4d873-maxDepth: 10,
	rfr_143f81a4d873-numTrees: 5
}, {
	rfr_143f81a4d873-maxDepth: 10,
	rfr_143f81a4d873-numTrees: 10
}, {
	rfr_143f81a4d873-maxDepth: 10,
	rfr_143f81a4d873-numTrees: 15
})
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_8032232b8a6f
pipeline: org.apache.spark.ml.Pipeline = pipelin...

---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of ouput column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. The details of implemeting a custom Tranfomer are explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before starting to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [35]:
val housingCol1 = housing.withColumn("rooms_per_household", expr("total_rooms/households"))
val housingCol2 = housingCol1.withColumn("bedrooms_per_room", expr("total_bedrooms/total_rooms"))
val housingExtra = housingCol2.withColumn("population_per_household", expr("population/households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]
housingCol2: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 10 more fields]
housingExtra: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


In [36]:
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

trait MyParams extends Params {
    final val inputCol1 = new Param[String](this, "inputCol1", "The first input column")
    final val inputCol2 = new Param[String](this, "inputCol2", "The second input column")
    final val outputCol = new Param[String](this, "outputCol", "The output column")
    
    protected def validateAndTransformSchema(schema: StructType): StructType = {
        // Check that the input type of inputCol1 is a DoubleType
        val idx1 = schema.fieldIndex($(inputCol1))
        val field1 = schema.fields(idx1)

        if (field1.dataType != DoubleType) {
          throw new Exception(s"Input type ${field1.dataType} did not match input type DoubleType")
        }
        
        // Check that the input type of inputCol2 is a DoubleType
        val idx2 = schema.fieldIndex($(inputCol2))
        val field2 = schema.fields(idx2)
                       
        if (field2.dataType != DoubleType) {
          throw new Exception(s"Input type ${field2.dataType} did not match input type DoubleType")
        }
        
        // Add the return field
        schema.add(StructField($(outputCol), DoubleType, false))
    }
}


import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}
defined trait MyParams


Then, extend the class `Transformer`, and implement its setter functions for the input and output columns, and call then `setInputCol1`, `setInputCol2`, and `setOutputCol`. Morever, you need to override the methods `copy`, `transformSchema`, and the `transform`. The details of what you need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [37]:
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.{col, udf}

class MyTransformer(override val uid: String) extends Transformer with MyParams {
    def this() = this(Identifiable.randomUID("columnaddition"))
    
    def setInputCol1(value: String): this.type = set(inputCol1, value)
    def setInputCol2(value: String): this.type = set(inputCol2, value)
    def setOutputCol(value: String): this.type = set(outputCol, value)

    override def copy(extra: ParamMap): MyTransformer = {
        defaultCopy(extra)
    }
    
    override def transformSchema(schema: StructType): StructType = {
        validateAndTransformSchema(schema)
    }
    
    override def transform(dataset: Dataset[_]): DataFrame = {
        housing.withColumn(${outputCol}, col(${inputCol1})/ col(${inputCol2}))
  }
}

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.{col, udf}
defined class MyTransformer


Now, an instance of `MyTransformer`, and set the input columns `total_rooms` and `households`, and the output column `rooms_per_household` and run it over the `housing` dataset.

In [38]:
val myTransformer = new MyTransformer()
    .setInputCol1("total_rooms")
    .setInputCol2("households")
    .setOutputCol("rooms_per_household")

val myDataset = myTransformer
    .transform(housing)
    .select("rooms_per_household")
    .show(5)

+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows



myTransformer: MyTransformer = columnaddition_d5bb01307905
myDataset: Unit = ()


---
# 8. Custom estimator (predictor)
Now, it's time to implement your own linear regression with gradient descent algorithm as a brand new Estimator. The whole code of the Estimator is given to you, and you do not need to implement anything. It is just a sample that shows how to build a custom Estimator.

The gradient descent update for linear regression is:
$$
w_{i+1} = w_{i} - \alpha_{i} \sum\limits_{j=1}^n (w_i^\top x_j - y_j)x_j
$$

where $i$ is the iteration number of the gradient descent algorithm, and $j$ identifies the observation. Here, $w$ represents an array of weights that is the same size as the array of features and provides a weight for each of the features when finally computing the label prediction in the form:

$$
prediction = w^\top \cdot\ x
$$

where $w$ is the final array of weights computed by the gradient descent, $x$ is the array of features of the observation point and $prediction$ is the label we predict should be associated to this observation.

The given `Helper` class implements the helper methods:
* `dot`: implements the dot product of two vectors and the dot product of a vector and a scalar
* `sum`: implements addition of two vectors
* `fill`: creates a vector of predefined size and initialize it with the predefined value

What you need to do is to implement the methods of the Linear Regresstion class `LR`, which are
* `rmsd`: computes the Root Mean Square Error of a given RDD of tuples of (label, prediction) using the formula:
$$
rmse = \sqrt{\frac{\sum\limits_{i=1}^n (label - prediction)^2}{n}}
$$
* `gradientSummand`: computes the following formula:
$$
gs_{ij} = (w_i^\top x_j - y_j)x_j
$$
* `gradient`: computes the following formula:
$$
gradient = \sum\limits_{j=1}^n gs_{ij}
$$

In [39]:
import org.apache.spark.ml.linalg.{Vector, Vectors, Matrices}

case class Instance(label: Double, features: Vector)

object Helper extends Serializable {
    def dot(v1: Vector, v2: Vector): Double = {
        val m = Matrices.dense(1, v1.size, v1.toArray)
        m.multiply(v2).values(0)
    }

    def dot_val(v: Vector, s: Double): Vector = {
        val baseArray = v.toArray.map(vi => vi * s)
        Vectors.dense(baseArray)
    }

    def sumVectors(v1: Vector, v2: Vector): Vector = {
        val baseArray = ((v1.toArray) zip (v2.toArray)).map { case (val1, val2) => val1 + val2 }
        Vectors.dense(baseArray)
    }

      def fillVector(size: Int, fillVal: Double): Vector = Vectors.dense(Array.fill[Double](size)(fillVal));
}

import org.apache.spark.ml.linalg.{Vector, Vectors, Matrices}
defined class Instance
defined object Helper


In [40]:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.linalg.{Vector, Vectors}

class LR() extends Serializable {
    def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
        val regressionMetrics = new RegressionMetrics(labelsAndPreds)
        regressionMetrics.rootMeanSquaredError
    }
  
    def gradientSummand(weights: Vector, lp: Instance): Vector = {
        val mult = (Helper.dot(weights, lp.features) - lp.label)
        val seq = (0 to lp.features.size - 1)
            .map(i => lp.features(i) * mult)
        
        return Vectors.dense(seq.toArray)
    }
  
    def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
        val n = trainData.count()
        val d = trainData.take(1)(0).features.size
        var w = Helper.fillVector(d, 0)
        val alpha = 1.0
        val errorTrain = Array.fill[Double](numIters)(0.0)

        for (i <- 0 until numIters) {
            val labelsAndPredsTrain = trainData
                .map(lp => (lp.label, Helper.dot(w, lp.features)))
            errorTrain(i) = calcRMSE(labelsAndPredsTrain)

            val gradient = trainData
                .map(lp => gradientSummand(w, lp))
                .reduce((v1, v2) => Helper.sumVectors(v1, v2))
            val alpha_i = alpha / (n * scala.math.sqrt(i + 1))
            val wAux = Helper.dot_val(gradient, (-1) * alpha_i)
            w = Helper.sumVectors(w, wAux)
        }
        (w, errorTrain)
    }
}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.linalg.{Vector, Vectors}
defined class LR


In [41]:
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

abstract class MyLinearModel[FeaturesType, Model <: MyLinearModel[FeaturesType, Model]]
    extends PredictionModel[FeaturesType, Model] {
}

class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
    extends MyLinearModel[Vector, MyLinearModelImpl] {

    override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)

    def predict(features: Vector): Double = {
        println("Predicting")
        val prediction = Helper.dot(weights, features)
        prediction
    }
}

import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.{ParamMap, Param, Params}
defined class MyLinearModel
defined class MyLinearModelImpl


In [42]:
import org.apache.spark.ml.Predictor
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.Row

abstract class MyLinearRegression[
        FeaturesType,
        Learner <: MyLinearRegression[FeaturesType, Learner, Model],
        Model <: MyLinearModel[FeaturesType, Model]]
    extends Predictor[FeaturesType, Learner, Model] {
}

class MyLinearRegressionImpl(override val uid: String)
        extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
    def this() = this(Identifiable.randomUID("linReg"))

    override def copy(extra: ParamMap): MyLinearRegressionImpl = defaultCopy(extra)
  
    def train(dataset: Dataset[_]): MyLinearModelImpl = {
        println("Training")

        val numIters = 10

        val instances: RDD[Instance] = dataset.select(
            col($(labelCol)), col($(featuresCol))).rdd.map {
                case Row(label: Double, features: Vector) =>
                    Instance(label, features)
        }

    val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)

    new MyLinearModelImpl(uid, weights, trainingError)
  }
}

import org.apache.spark.ml.Predictor
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.Row
defined class MyLinearRegression
defined class MyLinearRegressionImpl


In [43]:
val lr = new MyLinearRegressionImpl()
    .setLabelCol("label")
    .setFeaturesCol("features")

val model = lr.fit(trainSet)
val predictions = model.transform(trainSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

Training


org.apache.spark.SparkDriverExecutionException:  Execution error

---
# 9. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

### Load the data set into a Spark dataframe and print its schema

In [2]:
val df = spark.read.format("csv")
    .option("header", "true")
    .option("inferschema", "true")
    .load("data/ccdefault.csv")

df.printSchema()

val c = df.count
println(s"The data set has $c data points.")

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

df: org.apache.spark.sql.DataFrame = [ID: int, LIMIT_BAL: int ... 23 more fields]
c: Long = 30000


### Delete the ID variable

In [3]:
val dfDrop = df.drop("ID")

dfDrop: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]


### Describe the following numerical variables:

- The amount of given credit and age (LIMIT_BAL)
- The age
- The bill_amt features
- The pay_amt features

In [6]:
dfDrop.describe("LIMIT_BAL","AGE").show()

df.describe("BILL_AMT1", "BILL_AMT2", "BILL_AMT3", "BILL_AMT4", "BILL_AMT5", "BILL_AMT6").show()

df.describe("PAY_AMT1", "PAY_AMT2", "PAY_AMT3", "PAY_AMT4", "PAY_AMT5", "PAY_AMT6").show()

+-------+------------------+-----------------+
|summary|         LIMIT_BAL|              AGE|
+-------+------------------+-----------------+
|  count|             30000|            30000|
|   mean|167484.32266666667|          35.4855|
| stddev|129747.66156720246|9.217904068090155|
|    min|             10000|               21|
|    max|           1000000|               79|
+-------+------------------+-----------------+

+-------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+
|summary|        BILL_AMT1|        BILL_AMT2|        BILL_AMT3|         BILL_AMT4|        BILL_AMT5|       BILL_AMT6|
+-------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+
|  count|            30000|            30000|            30000|             30000|            30000|           30000|
|   mean|       51223.3309|49179.07516666667|       47013.1548| 43262.94896666666|40311.40096666667|    

### Look into the distributions of the categorical features

 - Target
 - Sex
 - Education
 - Marriage
 - Pay_0 - Pay_6

In [4]:
val featuresToDescribe = Array("DEFAULT", "SEX", "EDUCATION", "MARRIAGE", "PAY_0", "PAY_2",
                               "PAY_3", "PAY_4", "PAY_5", "PAY_6")

for (feat <- featuresToDescribe) {
    dfDrop.groupBy(feat)
        .count()
        .sort(desc("count"))
        .show()
}

+-------+-----+
|DEFAULT|count|
+-------+-----+
|      0|23364|
|      1| 6636|
+-------+-----+

+---+-----+
|SEX|count|
+---+-----+
|  2|18112|
|  1|11888|
+---+-----+

+---------+-----+
|EDUCATION|count|
+---------+-----+
|        2|14030|
|        1|10585|
|        3| 4917|
|        5|  280|
|        4|  123|
|        6|   51|
|        0|   14|
+---------+-----+

+--------+-----+
|MARRIAGE|count|
+--------+-----+
|       2|15964|
|       1|13659|
|       3|  323|
|       0|   54|
+--------+-----+

+-----+-----+
|PAY_0|count|
+-----+-----+
|    0|14737|
|   -1| 5686|
|    1| 3688|
|   -2| 2759|
|    2| 2667|
|    3|  322|
|    4|   76|
|    5|   26|
|    8|   19|
|    6|   11|
|    7|    9|
+-----+-----+

+-----+-----+
|PAY_2|count|
+-----+-----+
|    0|15730|
|   -1| 6050|
|    2| 3927|
|   -2| 3782|
|    3|  326|
|    4|   99|
|    1|   28|
|    5|   25|
|    7|   20|
|    6|   12|
|    8|    1|
+-----+-----+

+-----+-----+
|PAY_3|count|
+-----+-----+
|    0|15764|
|   -1| 5938|
| 

featuresToDescribe: Array[String] = Array(DEFAULT, SEX, EDUCATION, MARRIAGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)


#### Analysis:

From the tables that are presented above we can derive the following results:

1. We have more instances from females (18112) than males (11888)

2. The majority of the participants are graduates either from university (14030) or from school (10585), whereas only 4917 participants have graduated from high school. The minority has graduated from other institutes.

3. Almost half of the participants are single (15964), whereas 13659 are married and the rest 377 have undefined marital status.

4. "PAY_0" is the attribute that describes the payment status on September, where as we can see from the measurement scale of the repayment status most of the participants (14737) paid without delay on September (measurement scale = 0), whereas the minority paid after 3 or more months. In the same way we can observe what were the results for the rest months (columns X7 to X11).

5. Although in all of those 7 months the majority of the participants paid without delay (measurement scale = 0), there is no other difference in the rest of the scales. This means that the participants have a different behavior between two or more different months, which is reasonable regarding other parameters that can affect their lives. The only conclusion that we can derive, as it has been mentioned above, is that the majority was paying without delay in all these 7 months.

6. As we can see from the results above, we have a class imbalance, where the total number of instances of one of our classes (the one with positive labels = 1.0) is far less than the total number of instances of the other class (the one with negative labels = 0.0). More specifically, we have around 22% positive instances (label = 1.0) and 78% negative instances (label = 0.0). 

### Display the average limit balance for the different sexes, maritial statuses and education levels.

In [8]:
val featuresToDescribe = Array("SEX", "EDUCATION", "MARRIAGE")

for (feat <- featuresToDescribe) {
    df.groupBy(feat)
        .agg(avg("LIMIT_BAL"))
        .show()
}

+---+------------------+
|SEX|    avg(LIMIT_BAL)|
+---+------------------+
|  1| 163519.8250336474|
|  2|170086.46201413427|
+---+------------------+

+---------+------------------+
|EDUCATION|    avg(LIMIT_BAL)|
+---------+------------------+
|        1|212956.06991025034|
|        6|148235.29411764705|
|        3|126550.27049013626|
|        5| 168164.2857142857|
|        4|220894.30894308942|
|        2| 147062.4376336422|
|        0|217142.85714285713|
+---------+------------------+

+--------+------------------+
|MARRIAGE|    avg(LIMIT_BAL)|
+--------+------------------+
|       1|182200.89318398127|
|       3| 98080.49535603715|
|       2|156413.66073665748|
|       0|132962.96296296295|
+--------+------------------+



featuresToDescribe: Array[String] = Array(SEX, EDUCATION, MARRIAGE)


### Check for correlations in the data

In [5]:
def find_cors(features: Array[String], df: DataFrame) = {
    val va = new VectorAssembler()
        .setInputCols(features)
        .setOutputCol("COMBINED_FEATURES")

    val attributes = va.transform(df)

    val Row(coeff: Matrix) = Correlation.corr(attributes, "COMBINED_FEATURES").head
    val matrixRows = coeff.rowIter.toSeq.map(_.toArray)
    val tempDf = spark.sparkContext.parallelize(matrixRows).toDF("Row")

    val numOfCols = matrixRows.head.length
    val dfCorrelation = (0 until numOfCols).foldLeft(tempDf)((tempDf, num) => 
        tempDf.withColumn("Col" + num, $"Row".getItem(num)))
      .drop("Row")

    println(s"The standard correlation coefficients:\n")
    dfCorrelation.show(false)
}

find_cors: (features: Array[String], df: org.apache.spark.sql.DataFrame)Unit


In [11]:
// Find correlations in the bill_amt* features

val featuresBillAMT = Array("BILL_AMT1", "BILL_AMT2", "BILL_AMT3", "BILL_AMT4", "BILL_AMT5", "BILL_AMT6")
val dfCorBillAMT = find_cors(featuresBillAMT, dfDrop)

// Find correlations in the pay_amt* features
val featuresPayAMT = Array("PAY_AMT1", "PAY_AMT2", "PAY_AMT3", "PAY_AMT4", "PAY_AMT5", "PAY_AMT6")
val dfCorPayAMT = find_cors(featuresPayAMT, dfDrop)

// Find correlations in the pay_* features
val featuresPay = Array("PAY_0", "PAY_2", "PAY_3", "PAY_4", "PAY_5", "PAY_6")
val dfCorPay = find_cors(featuresPay, dfDrop)

The standard correlation coefficients:

+------------------+------------------+------------------+------------------+------------------+------------------+
|Col0              |Col1              |Col2              |Col3              |Col4              |Col5              |
+------------------+------------------+------------------+------------------+------------------+------------------+
|1.0               |0.9514836727518164|0.8922785291271811|0.8602721890293089|0.8297786058329933|0.8026501885528523|
|0.9514836727518164|1.0               |0.9283262592714868|0.8924822912577247|0.8597783072714432|0.8315935591018226|
|0.8922785291271811|0.9283262592714868|1.0               |0.9239694565909823|0.8839096973620095|0.8533200905940505|
|0.8602721890293089|0.8924822912577247|0.9239694565909823|1.0               |0.9401344040880051|0.9009409547978421|
|0.8297786058329933|0.8597783072714432|0.8839096973620095|0.9401344040880051|1.0               |0.9461968070521906|
|0.8026501885528523|0.8315935591

featuresBillAMT: Array[String] = Array(BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6)
dfCorBillAMT: Unit = ()
featuresPayAMT: Array[String] = Array(PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)
dfCorPayAMT: Unit = ()
featuresPay: Array[String] = Array(PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)
dfCorPay: Unit = ()


### Analysis of the correlations

Regarding the tables above, which present the correlations in the attributes, we can derive the following results:

1. The attributes {"BILL_AMT1", ... "BILL_AMT6"} represent the amount of bill statements in these 7 months. The correlation in these attributes is high, but gradually it is being slightly decreased. This means that from one month to another one user has  slightly the same expenses, but this is not a behavior that cannot been changed significantly after 6-7 months.

2. The attributes {"PAY_0", ..."PAY_6"} represent the repayment status for every month, which is the delay of the repayment in every month. The correlation here sometimes is really high (~ 0.82), whereas in other cases can be really low (~0.47). This is reasonable, as the payment delay from one month to another is something that can change significantly or not, because it can be affected by some external factors.

3. The attributes {"PAY_AMT1", ..."PAY_AMT6"} represent the amount that has been paid for every month. The correlation here is not high (~0.2).

### Rename the target variable and drop the old target

In [6]:
val dfTarget = dfDrop.withColumnRenamed("DEFAULT", "label")

val dfDropped = dfTarget.drop("DEFAULT")

dfTarget: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
dfDropped: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]


### Separate the label, feature, categorical and numerical columns

In [7]:
// Label columns
val colLabel = "label"

// Feature columns
val colFeat = dfDropped.columns.filter(_ != colLabel)

// Categorical columns
val colCat = Array("SEX", "EDUCATION", "MARRIAGE", "PAY_0", "PAY_2", "PAY_3",
                   "PAY_4", "PAY_5", "PAY_6")

// Numerical columns
val colNum = Array("LIMIT_BAL", "AGE", "BILL_AMT1", "BILL_AMT2", "BILL_AMT3",
                   "BILL_AMT4", "BILL_AMT5", "BILL_AMT6", "PAY_AMT1", "PAY_AMT2",
                   "PAY_AMT3", "PAY_AMT4", "PAY_AMT5", "PAY_AMT6")

colLabel: String = label
colFeat: Array[String] = Array(LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)
colCat: Array[String] = Array(SEX, EDUCATION, MARRIAGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)
colNum: Array[String] = Array(LIMIT_BAL, AGE, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


### Make categorical features ready for one-hot encoding

In [8]:
// Make sure that the categorical variables don't contain any negative values
val df01 = dfDropped.na.replace("PAY_0", Map(-1 -> 10))
val df02 = df01.na.replace("PAY_0", Map(-2 -> 11))
val df21 = df02.na.replace("PAY_2", Map(-1 -> 10))
val df22 = df21.na.replace("PAY_2", Map(-2 -> 11))
val df31 = df22.na.replace("PAY_3", Map(-1 -> 10))
val df32 = df31.na.replace("PAY_3", Map(-2 -> 11))
val df41 = df32.na.replace("PAY_4", Map(-1 -> 10))
val df42 = df41.na.replace("PAY_4", Map(-2 -> 11))
val df51 = df42.na.replace("PAY_5", Map(-1 -> 10))
val df52 = df51.na.replace("PAY_5", Map(-2 -> 11))
val df61 = df52.na.replace("PAY_6", Map(-1 -> 10))
val dfUpdatedCatCols = df61.na.replace("PAY_6", Map(-2 -> 11))

df01: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df02: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df21: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df22: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df31: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df32: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df41: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df42: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df51: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]
df52: org.apache.spark....

### Check for missing values

In [9]:
for (c <- colFeat) {
    val missingCount = dfUpdatedCatCols.filter(dfDrop(c).isNull || 
            dfUpdatedCatCols(c) === "" || dfUpdatedCatCols(c).isNaN).count() 
    printf("Missing values for " + c + " : " + missingCount + "\n")
}

Missing values for LIMIT_BAL : 0
Missing values for SEX : 0
Missing values for EDUCATION : 0
Missing values for MARRIAGE : 0
Missing values for AGE : 0
Missing values for PAY_0 : 0
Missing values for PAY_2 : 0
Missing values for PAY_3 : 0
Missing values for PAY_4 : 0
Missing values for PAY_5 : 0
Missing values for PAY_6 : 0
Missing values for BILL_AMT1 : 0
Missing values for BILL_AMT2 : 0
Missing values for BILL_AMT3 : 0
Missing values for BILL_AMT4 : 0
Missing values for BILL_AMT5 : 0
Missing values for BILL_AMT6 : 0
Missing values for PAY_AMT1 : 0
Missing values for PAY_AMT2 : 0
Missing values for PAY_AMT3 : 0
Missing values for PAY_AMT4 : 0
Missing values for PAY_AMT5 : 0
Missing values for PAY_AMT6 : 0


### Scale the numerical variables

In [10]:
val va = new VectorAssembler()
    .setInputCols(colNum)
    .setOutputCol("FEATURES_TO_SCALE")

val dfToScale = va.transform(dfUpdatedCatCols)

val scaler = new StandardScaler()
    .setInputCol("FEATURES_TO_SCALE")
    .setOutputCol("SCALED_FEATURES")

val dfScaled = scaler
    .fit(dfToScale)
    .transform(dfToScale)

dfScaled.select("SCALED_FEATURES").show(1, false)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|SCALED_FEATURES                                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|(14,[0,1,2,3,4,9],[0.15414535998894324,2.603628744963987,0.053139869207970765,0.0435834725779124,0.009935199510232218,0.029903384202815683])|
+--------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



va: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_b1ef1d317925
dfToScale: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 23 more fields]
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_cf5929c9c1fa
dfScaled: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 24 more fields]


### One-hot encode the categorical features

In [11]:
val encoder = new OneHotEncoderEstimator()
  .setInputCols(colCat)
  .setOutputCols(colCat map (name => s"${name}_one_hot"))

val dfHotEncoded = encoder.
    fit(dfScaled)
    .transform(dfScaled)

dfHotEncoded.select("PAY_3_one_hot").show(10)

+---------------+
|  PAY_3_one_hot|
+---------------+
|(11,[10],[1.0])|
| (11,[0],[1.0])|
| (11,[0],[1.0])|
| (11,[0],[1.0])|
|(11,[10],[1.0])|
| (11,[0],[1.0])|
| (11,[0],[1.0])|
|(11,[10],[1.0])|
| (11,[2],[1.0])|
|     (11,[],[])|
+---------------+
only showing top 10 rows



encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_7081bee6cbcf
dfHotEncoded: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 33 more fields]


### Create the final data frame

In [12]:
val va = new VectorAssembler()
    .setInputCols(Array("SCALED_FEATURES", "MARRIAGE_one_hot", "EDUCATION_one_hot", "SEX_one_hot",
                        "PAY_0_one_hot", "PAY_2_one_hot", "PAY_3_one_hot", "PAY_4_one_hot",
                        "PAY_5_one_hot", "PAY_6_one_hot"))
    .setOutputCol("features")

val dataset = va
    .transform(dfHotEncoded)
    .select("features", "label")

dataset.show(1, false)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                              |label|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(91,[0,1,2,3,4,9,15,19,27,38,57,68],[0.15414535998894324,2.603628744963987,0.053139869207970765,0.0435834725779124,0.009935199510232218,0.029903384202815683,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
only showing top 1 row



va: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_942fd5798645
dataset: org.apache.spark.sql.DataFrame = [features: vector, label: int]


### Split the data set in a training set (80%) and test set (20%)

In [13]:
val Array(trainSet, testSet) = dataset.randomSplit(Array(0.8, 0.2))

trainSet.show(3)
testSet.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(91,[0,1,2,3,4,5,...|    0|
|(91,[0,1,2,3,4,5,...|    0|
|(91,[0,1,2,3,4,5,...|    0|
+--------------------+-----+
only showing top 3 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(91,[0,1,2,3,4,5,...|    0|
|(91,[0,1,2,3,4,5,...|    0|
|(91,[0,1,2,3,4,5,...|    1|
+--------------------+-----+
only showing top 3 rows



trainSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: int]
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: int]


### Helper function to measure other metrics: Accuracy, Recall, Precision, F1-score

In [50]:
def compute_metrics(predictions: DataFrame, str: String)={
    
    val lp = predictions.select( "label", "prediction")
    val count_total = predictions.count()
    val correct = lp.filter($"label" === $"prediction").count()
    val wrong = lp.filter(not($"label" === $"prediction")).count()
    val TP = lp.filter($"prediction" === 1.0).filter($"label" === $"prediction").count()
    val TN = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()
    val FN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()
    val FP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()

    print("\nFor the ")
    print(str)
    print(" classifier the following evaluation metrics were computed: ")
    val accuracy = correct.toDouble/count_total.toDouble
    print("\nThe accuracy is: ")
    print(accuracy)

    val precision = TP.toDouble / (TP+FP).toDouble
    print("\nThe precision is: ")
    print(precision)

    val recall = TP.toDouble / (TP+FN).toDouble
    print("\nThe recall is: ")
    print(recall)

    val f1_score = (2*precision*recall).toDouble / (precision+recall).toDouble
    print("\nThe F1 score is: ")
    print(f1_score)
}

compute_metrics: (predictions: org.apache.spark.sql.DataFrame, str: String)Unit


## Train various models

### Train a logistic regression model

In [51]:
// Instantiate the model
val lrModel = new LogisticRegression()
    .setMaxIter(50)
    .setFeaturesCol("features")
    .setLabelCol("label")

// Define the hyper-parameter grid
val paramGrid = new ParamGridBuilder()
    .addGrid(lrModel.regParam, Array(0.1, 0.05, 0.01, 0))
    .addGrid(lrModel.elasticNetParam, Array(0.1, 0.05, 0.01, 0))
    .build()

// The BinaryClassificationEvaluator evaluates on the Area Under the ROC-curve (AUC) metric
val evaluator = new BinaryClassificationEvaluator()
    
val cv = new CrossValidator()
    .setEstimator(lrModel)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(5)

// Perform cross-validation for model selection
val cvModel = cv.fit(trainSet)
// Predict the labels of the test set
val predictions = cvModel.transform(testSet)

print("The best logistic regression model has the following attributes:\n")
print(cvModel.bestModel.extractParamMap())

print("\nThe AUC of the best logistic regression model is: ")
print(evaluator.evaluate(predictions))

// Compute other metrics for the classifier
compute_metrics(predictions, "Logistic Regression")

The best logistic regression model has the following attributes:
{
	logreg_f7e5a1f3a840-aggregationDepth: 2,
	logreg_f7e5a1f3a840-elasticNetParam: 0.05,
	logreg_f7e5a1f3a840-family: auto,
	logreg_f7e5a1f3a840-featuresCol: features,
	logreg_f7e5a1f3a840-fitIntercept: true,
	logreg_f7e5a1f3a840-labelCol: label,
	logreg_f7e5a1f3a840-maxIter: 50,
	logreg_f7e5a1f3a840-predictionCol: prediction,
	logreg_f7e5a1f3a840-probabilityCol: probability,
	logreg_f7e5a1f3a840-rawPredictionCol: rawPrediction,
	logreg_f7e5a1f3a840-regParam: 0.01,
	logreg_f7e5a1f3a840-standardization: true,
	logreg_f7e5a1f3a840-threshold: 0.5,
	logreg_f7e5a1f3a840-tol: 1.0E-6
}
The AUC of the best logistic regression model is: 0.7719880518822813
For the Logistic Regression classifier the following evaluation metrics were computed: 
The accuracy is: 0.8274086378737542
The precision is: 0.7177541729893778
The recall is: 0.35671191553544496
The F1 score is: 0.47657430730478595

lrModel: org.apache.spark.ml.classification.LogisticRegression = logreg_f7e5a1f3a840
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_f7e5a1f3a840-elasticNetParam: 0.1,
	logreg_f7e5a1f3a840-regParam: 0.1
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.05,
	logreg_f7e5a1f3a840-regParam: 0.1
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.01,
	logreg_f7e5a1f3a840-regParam: 0.1
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.0,
	logreg_f7e5a1f3a840-regParam: 0.1
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.1,
	logreg_f7e5a1f3a840-regParam: 0.05
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.05,
	logreg_f7e5a1f3a840-regParam: 0.05
}, {
	logreg_f7e5a1f3a840-elasticNetParam: 0.01,
	logreg_f7e5a1f3a840-regParam: 0.05
}, {
	logreg_f7e5a1f3a840-elasticNetParam:...

### Train a decision tree classifier

In [54]:
// Instantiate the decision tree classifier
val dtModel = new DecisionTreeClassifier()
    .setLabelCol("label")
    .setFeaturesCol("features")

val paramGrid = new ParamGridBuilder()
    .addGrid(dtModel.impurity, Array("entropy", "gini"))
    .addGrid(dtModel.maxDepth, Array(4, 5, 6, 7, 8, 9, 10))
    .addGrid(dtModel.minInstancesPerNode, Array(1, 2))//, 3))
    .build()

val evaluator = new BinaryClassificationEvaluator()
val cv = new CrossValidator()
    .setEstimator(dtModel)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(5)

// Perform cross-validation for model selection
val cvModel = cv.fit(trainSet)

// Predict the labels of the test set
val predictions = cvModel.transform(testSet)

print("The best decision model has the following attributes:\n")
print(cvModel.bestModel.extractParamMap())

print("\nThe AUC of the best decision tree model is: ")
print(evaluator.evaluate(predictions))

// Compute other metrics for the classifier
compute_metrics(predictions, "Decision Tree")

The best decision model has the following attributes:
{
	dtc_19d82a833a7d-cacheNodeIds: false,
	dtc_19d82a833a7d-checkpointInterval: 10,
	dtc_19d82a833a7d-featuresCol: features,
	dtc_19d82a833a7d-impurity: gini,
	dtc_19d82a833a7d-labelCol: label,
	dtc_19d82a833a7d-maxBins: 32,
	dtc_19d82a833a7d-maxDepth: 6,
	dtc_19d82a833a7d-maxMemoryInMB: 256,
	dtc_19d82a833a7d-minInfoGain: 0.0,
	dtc_19d82a833a7d-minInstancesPerNode: 2,
	dtc_19d82a833a7d-predictionCol: prediction,
	dtc_19d82a833a7d-probabilityCol: probability,
	dtc_19d82a833a7d-rawPredictionCol: rawPrediction,
	dtc_19d82a833a7d-seed: 159147643
}
The AUC of the best decision tree model is: 0.44689869163226875
For the Decision Tree classifier the following evaluation metrics were computed: 
The accuracy is: 0.8260797342192691
The precision is: 0.7110438729198184
The recall is: 0.35444947209653094
The F1 score is: 0.47307498741821846

dtModel: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_19d82a833a7d
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d-maxDepth: 4,
	dtc_19d82a833a7d-minInstancesPerNode: 1
}, {
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d-maxDepth: 5,
	dtc_19d82a833a7d-minInstancesPerNode: 1
}, {
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d-maxDepth: 6,
	dtc_19d82a833a7d-minInstancesPerNode: 1
}, {
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d-maxDepth: 7,
	dtc_19d82a833a7d-minInstancesPerNode: 1
}, {
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d-maxDepth: 8,
	dtc_19d82a833a7d-minInstancesPerNode: 1
}, {
	dtc_19d82a833a7d-impurity: entropy,
	dtc_19d82a833a7d...

### Train a random forest classifier

In [53]:
val rfModel = new RandomForestClassifier()
    .setNumTrees(50)
    .setLabelCol("label")
    .setFeaturesCol("features")

val paramGrid = new ParamGridBuilder()
    .addGrid(rfModel.featureSubsetStrategy, Array("auto", "all", "sqrt"))
//     .addGrid(rfModel.maxDepth(4, 5, 6, 7, 8, 9, 10))
    .addGrid(rfModel.minInstancesPerNode, Array(1, 2, 3))
    .build()

val evaluator = new BinaryClassificationEvaluator()
val cv = new CrossValidator()
    .setEstimator(rfModel)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(5)

// Perform cross-validation for model selection
val cvModel = cv.fit(trainSet)

// Predict the labels of the test set
val predictions = cvModel.transform(testSet)

print("The best random forest model has the following attributes:\n")
print(cvModel.bestModel.extractParamMap())

print("\nThe AUC of the best random forest model is: ")
print(evaluator.evaluate(predictions))

// Compute other metrics for the classifier
compute_metrics(predictions, "Random Forest")

The best random forest model has the following attributes:
{
	rfc_cd628e6da4a1-cacheNodeIds: false,
	rfc_cd628e6da4a1-checkpointInterval: 10,
	rfc_cd628e6da4a1-featureSubsetStrategy: auto,
	rfc_cd628e6da4a1-featuresCol: features,
	rfc_cd628e6da4a1-impurity: gini,
	rfc_cd628e6da4a1-labelCol: label,
	rfc_cd628e6da4a1-maxBins: 32,
	rfc_cd628e6da4a1-maxDepth: 5,
	rfc_cd628e6da4a1-maxMemoryInMB: 256,
	rfc_cd628e6da4a1-minInfoGain: 0.0,
	rfc_cd628e6da4a1-minInstancesPerNode: 1,
	rfc_cd628e6da4a1-numTrees: 50,
	rfc_cd628e6da4a1-predictionCol: prediction,
	rfc_cd628e6da4a1-probabilityCol: probability,
	rfc_cd628e6da4a1-rawPredictionCol: rawPrediction,
	rfc_cd628e6da4a1-seed: 207336481,
	rfc_cd628e6da4a1-subsamplingRate: 1.0
}
The AUC of the best random forest model is: 0.7649028540654924
For the Random Forest classifier the following evaluation metrics were computed: 
The accuracy is: 0.8194352159468439
The precision is: 0.7414141414141414
The recall is: 0.27677224736048267
The F1 score is: 0.

rfModel: org.apache.spark.ml.classification.RandomForestClassifier = rfc_cd628e6da4a1
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_cd628e6da4a1-featureSubsetStrategy: auto,
	rfc_cd628e6da4a1-minInstancesPerNode: 1
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: all,
	rfc_cd628e6da4a1-minInstancesPerNode: 1
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: sqrt,
	rfc_cd628e6da4a1-minInstancesPerNode: 1
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: auto,
	rfc_cd628e6da4a1-minInstancesPerNode: 2
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: all,
	rfc_cd628e6da4a1-minInstancesPerNode: 2
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: sqrt,
	rfc_cd628e6da4a1-minInstancesPerNode: 2
}, {
	rfc_cd628e6da4a1-featureSubsetStrategy: auto,
	rfc_cd628e6da4a1-minIn...

### Analysis

In this part, we built three different statistical models, as well as we trained and tested all of them using the "ccdefault.csv" dataset. The first statistical model was Logistic Regression and it is the appropriate model to use for binary classification. In our case we can use the Logistic Regression for our regression analysis, since our dependent variable is binary/categorical. Moreover, the logistic function is a sigmoid function, or else an S-curve where the return value monotonously is getting increased from 0 to 1 and from −1 to 1. Turning now on how our classifier worked after fine-tuning our predictive model's parameters, we found that the AUC of the best logistic regression model was around 0.772. Also, we computed other metrics, like accuracy, recall, precision and F1-score: <br>
The accuracy is: 0.8274 <br>
The precision is: 0.7178 <br>
The recall is: 0.3567 <br>
The F1 score is: 0.4766 <br>

As regards the Decision Tree classifier, it is a kind of representation when we want to perform classification. Our dependent variable is categorical and by using a decision tree we actually perform a binary recursive partitioning. So we split our data and we keep splitting it until the end of the tree for every branch. This approach is called "divide and conquer", since we split the dataset and we create some smaller subsets until the iteration stops when the algorithm meets a criterion or when all the data in subsets are homogenous. Our decision tree classifier gave us an AUC value of 0.45 for the best decision model. This cannot be, since the AUC is a value in the 0.5-1.0 range. We thus suspect that there is an error in the Spark implementation. To compare the decision tree to the other models, we turn to the other evaluation metrics instead. The results are the followings:<br>
The accuracy is: 0.8261<br>
The precision is: 0.711<br>
The recall is: 0.35445<br>
The F1 score is: 0.47308<br>
Regarding to the accuracy, precision, recall and F1 score of the decision tree, we can see that the decision tree is good at distinguishing the different classes. Taking into consideration these metrics, we derive that decision tree classifier doesn't underperform in comparison to the logistic regression and random forest classifiers, which was a conclusion that we derived by considering the AUC values. For some reason, we do not get a good AUC value in Spark. 

Turning now to the last model, we built a random forest classifier, which consists of many decision trees. We build an uncorrelated forest of trees whose result is way more accurate than that of any individual decision tree. Turning now on how our classifier worked after fine-tuning our predictive model's parameters, we found that the AUC of the best random forest model is: 0.7649. Also, we computed other metrics, like accuracy, recall, precision and F1-score:<br>
The accuracy is: 0.819<br>
The precision is: 0.7414<br>
The recall is: 0.2768<br>
The F1 score is: 0.4031<br>

Briefly now we will explain what the AUC is and why it is a good metric for our classifiers. AUC-ROC or else Area Under the Receiver Operating Characteristics is a metric to evaluate how a statistical model performs. This metric describes if a model is able to give accurate results and to distinguish different classes. The ROC represents the probability and the AUC describes how good is a model so as to separate the different categories. If AUC is close to 1, it means that our model separates the classes pretty well, whereas if AUC is close to 0 the model cannot separate the classes at all. In case AUC is around 0.55, it means that the classifier does not have category separation capability. Let's compare the different AUC score for our models based on what we explained. The Logistic Regression has an AUC score at ~0.77, which means that the model has a high measure of separability comparing with the Decision Tree classifier which has an AUC score around 0.45 and it falls in the worst case senario. Furthemore, the Random Forest outperforms the Decision Tree with an AUC score at 0.765 which is almost similar with the Logistic Regression's score. 

In this point, we will compare the other metrics that we calculated. Firstly, we will briefly explain what the accuracy, precision, recall and F1-score show and then we will compare the metrics in all our three classifiers. Precision is about how accurate our model is, which means that it gives us an information of how many of the predicted positives are actual positives. It could be a really good metrics, when the costs of False Positive is pretty high. Regarding the recall, it computes the True Positive over the Actual Positives. We should use this metric, when the costs of False Negative is pretty high. It is really useful to compute recall in fraud detection. Finally, we calculate the F1-score, which is a better measure to use if we want to find a balance between Precision and Recall. On the other hand, the metric of accuracy is the number of correctly classified samples over the whole data. 

Having explained all our metrics, we should now define which metric is the best for us if we want to evaluate our classifiers. F1 Score is a better measure if we want to seek a balance between precision and recall, as well as if there is a class imbalance. Alongside, for calculating accuracy we consider all the correctly classified data, so this metric is more useful when we have almost the same amount of data points in every class. That is the main reason why we prefer to calculate F1 score instead of accuracy. Moreover, accuracy is used when the True Positives and True Negatives are higher weighted and more important, whereas F1-score is prefered when the False Negatives and False Positives are crucial.

To sum up, all three classifiers perform really good, but there is something wrong with the computation of the AUC value for decision tree classifier. Regarding the other metrics that we calculated, they are all giving us a good insight of how well the models work.