# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [48]:
// TODO: Replace <FILL IN> with appropriate code

val housing = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
  .csv("data/housing.csv")


housing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 8 more fields]


---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [49]:
// TODO: Replace <FILL IN> with appropriate code

housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [50]:
// TODO: Replace <FILL IN> with appropriate code

housing.count()

res37: Long = 20640


## 2.2. Look at the data
Print the first five records of the dataset.

In [51]:
// TODO: Replace <FILL IN> with appropriate code

housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Print the number of records with population more than 10000.

In [52]:
// TODO: Replace <FILL IN> with appropriate code

housing.where("population > 10000").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.92|   37.53|               7.0|    28258.0|        3864.0|   12203.0|    3701.0|       8.4045|          451100.0|      <1H OCEAN|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|      <1H OCEAN|
|  -117.87|   34.04|               7.0|    27700.0|        4179.0|   15037.0|    4072.0|       6.6288|          339700.0|      <1H OCEAN|
|  -117.88|   33.96|              16.0|    19059.0|        3079.0|   10988.0|    3061.0|       5.5469|          265200.0|      <1H OCEAN|
|  -118.78|   34.16|              

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [53]:
// TODO: Replace <FILL IN> with appropriate code

housing.describe("housing_median_age", "total_rooms", "median_house_value", "population").show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [54]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.sql.functions._

housing.select(max("housing_median_age"), min("total_rooms"), avg("median_house_value")).show()

+-----------------------+----------------+-----------------------+
|max(housing_median_age)|min(total_rooms)|avg(median_house_value)|
+-----------------------+----------------+-----------------------+
|                   52.0|             2.0|     206855.81690891474|
+-----------------------+----------------+-----------------------+



import org.apache.spark.sql.functions._


## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [55]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").count().sort(col("count").desc).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [116]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").avg("median_house_value").withColumnRenamed("avg(median_house_value)", "avg_value").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [117]:
// TODO: Replace <FILL IN> with appropriate code

housing.createOrReplaceTempView("df")
spark.sql("SELECT ocean_proximity, AVG(median_house_value) AS avg_value FROM df GROUP BY ocean_proximity").show()


+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [118]:
import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("housing_median_age", "total_rooms", "median_house_value", "population")).setOutputCol("features")

val housingAttrs = va.transform(housing)

housingAttrs.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

import org.apache.spark.ml.feature.VectorAssembler
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_18af9496b8f3, handleInvalid=error, numInputCols=4
housingAttrs: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]


In [119]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
val Row(coeff: Matrix) = Correlation.corr(housingAttrs, "features").head
println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                   -0.3612622012223152  0.10562341249321029    -0.29624423977353675   
-0.3612622012223152   1.0                  0.13415311380656375    0.8571259728659744     
0.10562341249321029   0.13415311380656375  1.0                    -0.024649678888894997  
-0.29624423977353675  0.8571259728659744   -0.024649678888894997  1.0                    


import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
coeff: org.apache.spark.ml.linalg.Matrix =
1.0                   -0.3612622012223152  0.10562341249321029    -0.29624423977353675
-0.3612622012223152   1.0                  0.13415311380656375    0.8571259728659744
0.10562341249321029   0.13415311380656375  1.0                    -0.024649678888894997
-0.29624423977353675  0.8571259728659744   -0.024649678888894997  1.0


## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [120]:
// TODO: Replace <FILL IN> with appropriate code

val housingCol1 = housing.withColumn("rooms_per_household", col("total_rooms") / col("households"))
val housingCol2 = housingCol1.withColumn("bedrooms_per_room", col("total_bedrooms") / col("total_rooms"))
val housingExtra = housingCol2.withColumn("population_per_household", col("population") / col("households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]
housingCol2: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 10 more fields]
housingExtra: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [121]:
val renamedHousing = housingExtra.withColumnRenamed("median_house_value", "label")

renamedHousing.show(5)


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  -122.24|   37.85|              5

renamedHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, sice we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [122]:
// label columns
val colLabel = "label"

// categorical columns
val colCat = "ocean_proximity"

// numerical columns
val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel: String = label
colCat: String = ocean_proximity
colNum: Array[String] = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)


## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [123]:
for (c <- colNum) {
    if (c != "ocean_proximity") {
        val housingWithMissingValues = renamedHousing.filter((col(c).isNull and (col(c) <=> lit(""))))
        if (housingWithMissingValues.count() > 0) {
            println(s"$c")
        }
    }
}


total_bedrooms
bedrooms_per_room


As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [124]:
import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("median").setInputCols(Array("total_bedrooms", "bedrooms_per_room")).setOutputCols(Array("total_bedrooms", "bedrooms_per_room"))
                       
val imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)

imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



import org.apache.spark.ml.feature.Imputer
imputer: org.apache.spark.ml.feature.Imputer = imputer_3b1ce317d551
imputedHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [125]:
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va = new VectorAssembler().setInputCols(colNum).setOutputCol("vector_features")
val featuredHousing = va.transform(imputedHousing)

val scaler = new StandardScaler().setInputCol("vector_features").setOutputCol("scaler_features")
val scaledHousing = scaler.fit(featuredHousing).transform(featuredHousing)

scaledHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|     vector_features|     scaler_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-122.23,37.88,41...|[-61.007269596069...|
|  -122.22|   37.86|              21.0|     7099.0|        1

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
va: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_9f3de830eaf4, handleInvalid=error, numInputCols=11
featuredHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 12 more fields]
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_3660716dc1c7
scaledHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 13 more fields]


## 3.2. Prepare categorical attributes
After imputing and scaling the continuse attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

In [126]:
renamedHousing.select("ocean_proximity").distinct().show()

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [127]:
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer().setInputCol(colCat).setOutputCol("ocean_proximity_numbers")
val idxHousing = indexer.fit(renamedHousing).transform(renamedHousing)

idxHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_numbers|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.2

import org.apache.spark.ml.feature.StringIndexer
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_311662372b91
idxHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 12 more fields]


Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [128]:
indexer.fit(renamedHousing).labels

res113: Array[String] = Array(<1H OCEAN, INLAND, NEAR OCEAN, NEAR BAY, ISLAND)


### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [129]:
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder().setInputCol("ocean_proximity_numbers").setOutputCol("ocean_proximity_vectors")

val ohHousing = encoder.fit(idxHousing).transform(idxHousing)

ohHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|ocean_proximity_numbers|ocean_proximity_vectors|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|          (4,[3],[1.0])|
|  -122.22|   37.86|              21

import org.apache.spark.ml.feature.OneHotEncoder
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHotEncoder_7e0c80bd377d
ohHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 13 more fields]


---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [130]:
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import scala.collection.mutable

var numStage = new mutable.ArrayBuffer[PipelineStage]()
numStage += imputer
numStage += va
numStage += scaler

var catStage = new mutable.ArrayBuffer[PipelineStage]()
catStage += indexer
catStage += encoder

val numPipeline = new Pipeline().setStages(numStage.toArray)
val catPipeline = new Pipeline().setStages(catStage.toArray)

val pipeline = new Pipeline().setStages(Array(numPipeline, catPipeline))
val newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)

newHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|     vector_features|     scaler_features|ocean_proximity_numbers|ocean_proximity_vectors|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import scala.collection.mutable
numStage: scala.collection.mutable.ArrayBuffer[org.apache.spark.ml.PipelineStage] = ArrayBuffer(imputer_3b1ce317d551, VectorAssembler: uid=vecAssembler_9f3de830eaf4, handleInvalid=error, numInputCols=11, stdScal_3660716dc1c7)
catStage: scala.collection.mutable.ArrayBuffer[org.apache.spark.ml.PipelineStage] = ArrayBuffer(strIdx_311662372b91, oneHotEncoder_7e0c80bd377d)
numPipeline: org.apache.spark.ml.Pipeline = pipeline_b6aea2632597
catPipeline: org.apache.spark.ml.Pipeline = pipeline_c356f6095feb
pipeline: org.apache.spark.ml.Pipeline = pipeline_77999c9bc5a2
newHousing: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 15 more fields]


Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [131]:
val va2 = new VectorAssembler().setInputCols(Array("scaler_features", "ocean_proximity_vectors"))
.setOutputCol("features")
val dataset = va2.transform(newHousing).select("features", "label")

dataset.show(5, false)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                                                                                                                                            |label   |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|[-61.00726959606955,17.734477624640412,3.2577023016083064,0.40337085073160667,0.30758821710917267,0.2843362208866199,0.3295584480852433,4.382095394195227,2.8228125480951665,2.5405867237343416,0.24605655309533123,0.0,0.0,0.0,1.0]|452600.0|
|[-61.002278409814444,17.725114120086744

va2: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_a1c4b9e2e9f9, handleInvalid=error, numInputCols=2
dataset: org.apache.spark.sql.DataFrame = [features: vector, label: double]


---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [132]:
val Array(trainSet, testSet) = dataset.randomSplit(Array(0.8, 0.2))

print("Training set:\n")
trainSet.show(5)
print("Test set:\n")
testSet.show(5)


Training set:
+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-62.065401082150...| 94600.0|
|[-62.040445150874...| 85800.0|
|[-62.040445150874...|103600.0|
|[-62.025471592109...| 79000.0|
|[-62.020480405854...|111400.0|
+--------------------+--------+
only showing top 5 rows

Test set:
+--------------------+-------+
|            features|  label|
+--------------------+-------+
|[-62.005506847089...|50800.0|
|[-62.000515660834...|78300.0|
|[-61.985542102068...|90100.0|
|[-61.975559729558...|82800.0|
|[-61.975559729558...|75500.0|
+--------------------+-------+
only showing top 5 rows



trainSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: double]


## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [133]:
import org.apache.spark.ml.regression.LinearRegression

// train the model
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val lrModel = lr.fit(trainSet)
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [-15112.272375722867,-12294.872144526882,15477.233450777627,8882.058802437119,12594.856054597652,-29500.057075588003,13462.317363767472,83431.98553164786,2193.4113824324363,18475.601698807754,-1206.8558379758672,17501.316799348147,-38656.50933481211,29848.62575240823,20279.410570120857] Intercept: -776532.9454630188
RMSE: 69311.88435607517


import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_8bc9d6c61136
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_8bc9d6c61136, numFeatures=15
trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@3a348789


Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [134]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
| 234584.6836787687|50800.0|[-62.005506847089...|
|154029.62481895753|78300.0|[-62.000515660834...|
|214358.64912636147|90100.0|[-61.985542102068...|
|185007.48090691422|82800.0|[-61.975559729558...|
|184581.41161690396|75500.0|[-61.975559729558...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 67921.84355516791


import org.apache.spark.ml.evaluation.RegressionEvaluator
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_5bd919438a8a, metricName=rmse, throughOrigin=false
rmse: Double = 67921.84355516791


## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [135]:
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val dt = new DecisionTreeRegressor()

// train the model
val dtModel = dt.fit(trainSet)

// make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
| 185486.2138836773|50800.0|[-62.005506847089...|
|131441.91176470587|78300.0|[-62.000515660834...|
| 223152.2995951417|90100.0|[-61.985542102068...|
|167739.93754152823|82800.0|[-61.975559729558...|
|131441.91176470587|75500.0|[-61.975559729558...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 67394.69860976153


import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
dt: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_4eab0a47b71a
dtModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_4eab0a47b71a, depth=5, numNodes=63, numFeatures=15
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_0bf58d95a920, metricName=rmse, throughOrigin=false
rmse: Double = 67394.69860976153


## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [136]:
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val rf = new RandomForestRegressor()

// train the model
val rfModel = rf.fit(trainSet)

// make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")


+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|197122.26999582426|50800.0|[-62.005506847089...|
|167828.71980701765|78300.0|[-62.000515660834...|
| 195185.6240832696|90100.0|[-61.985542102068...|
|174966.65163700673|82800.0|[-61.975559729558...|
|171108.58542867747|75500.0|[-61.975559729558...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 64461.80374028058


import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
rf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_a78e4c7ab451
rfModel: org.apache.spark.ml.regression.RandomForestRegressionModel = RandomForestRegressionModel: uid=rfr_a78e4c7ab451, numTrees=20, numFeatures=15
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_cab9fcecd39a, metricName=rmse, throughOrigin=false
rmse: Double = 64461.80374028058


## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [137]:
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val gb = new GBTRegressor()

// train the model
val gbModel = gb.fit(trainSet)

// make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|141460.23546996404|50800.0|[-62.005506847089...|
|  80395.8953092036|78300.0|[-62.000515660834...|
| 156543.1587623532|90100.0|[-61.985542102068...|
|104874.28899995565|82800.0|[-61.975559729558...|
| 75271.62971947192|75500.0|[-61.975559729558...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 54296.22452327231


import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
gb: org.apache.spark.ml.regression.GBTRegressor = gbtr_fb433fe4cb9f
gbModel: org.apache.spark.ml.regression.GBTRegressionModel = GBTRegressionModel: uid=gbtr_fb433fe4cb9f, numTrees=20, numFeatures=15
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_52993bfa7b96, metricName=rmse, throughOrigin=false
rmse: Double = 54296.22452327231


---
# 6. Hyperparameter tuning
An important task in Machie Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [138]:
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator

val paramGrid = new ParamGridBuilder().addGrid(rf.numTrees, Array(1, 5, 10)).addGrid(rf.maxDepth, Array(5 ,10, 15)).build()

val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")

val cv = new CrossValidator()
    .setEstimator(rf)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)

val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|          211692.1|50800.0|[-62.005506847089...|
|127840.17647058824|78300.0|[-62.000515660834...|
|131438.18181818182|90100.0|[-61.985542102068...|
| 108926.3888888889|82800.0|[-61.975559729558...|
| 80096.66666666666|75500.0|[-61.975559729558...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 51595.491067216375


import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_a78e4c7ab451-maxDepth: 5,
	rfr_a78e4c7ab451-numTrees: 1
}, {
	rfr_a78e4c7ab451-maxDepth: 5,
	rfr_a78e4c7ab451-numTrees: 5
}, {
	rfr_a78e4c7ab451-maxDepth: 5,
	rfr_a78e4c7ab451-numTrees: 10
}, {
	rfr_a78e4c7ab451-maxDepth: 10,
	rfr_a78e4c7ab451-numTrees: 1
}, {
	rfr_a78e4c7ab451-maxDepth: 10,
	rfr_a78e4c7ab451-numTrees: 5
}, {
	rfr_a78e4c7ab451-maxDepth: 10,
	rfr_a78e4c7ab451-numTrees: 10
}, {
	rfr_a78e4c7ab451-maxDepth: 15,
	rfr_a78e4c7ab451-numTrees: 1
}, {
	rfr_a78e4c7ab451-maxDepth: 15,
	rfr_a78e4c7ab451-numTrees: 5
}, {
	rfr_a78e4c7ab451-maxDepth: 15,
	rfr_a78e...


---
# 7. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

In [16]:
val credit = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
  .csv("data/ccdefault.csv")
creditCard.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

credit: org.apache.spark.sql.DataFrame = [ID: int, LIMIT_BAL: int ... 23 more fields]


In [13]:
//import org.apache.spark.mllib.stat.Statistics
//val creditCard = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
  //.csv("data/ccdefault.csv")
//val rddFromFile = spark.sparkContext.textFile("data/ccdefault.csv")

//val testResult = Statistics.kolmogorovSmirnovTest(rddFromFile, "norm", 0, 1)
//println(testResult)

import org.apache.spark.mllib.stat.Statistics
rddFromFile: org.apache.spark.rdd.RDD[String] = data/ccdefault.csv MapPartitionsRDD[47] at textFile at <console>:28


In [17]:
println(s"$credit.count()")
println(s"$credit.columns()")


[ID: int, LIMIT_BAL: int ... 23 more fields].count()
[ID: int, LIMIT_BAL: int ... 23 more fields].columns()


In [18]:
credit.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

In [19]:
credit.select(col("DEFAULT")).describe().show()

+-------+-------------------+
|summary|            DEFAULT|
+-------+-------------------+
|  count|              30000|
|   mean|             0.2212|
| stddev|0.41506180569093254|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [20]:
credit.groupBy(col("DEFAULT")).count().show()
print(s"Total number of records in the dataset = $credit.count()")

+-------+-----+
|DEFAULT|count|
+-------+-----+
|      1| 6636|
|      0|23364|
+-------+-----+

Total number of records in the dataset = [ID: int, LIMIT_BAL: int ... 23 more fields].count()

In [21]:
val credit_dataset = credit.withColumnRenamed("DEFAULT", "label").drop("ID")

credit_dataset.show(5)

+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|
|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    1|
|    90000|  2|        2|

credit_dataset: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 22 more fields]


In [65]:
val colNum = credit_dataset.columns.filter(_ != "label").filter(_ != "SEX").filter(_!= "EDUCATION").filter(_!= "MARRIAGE")
print(colNum)

[Ljava.lang.String;@7f52dfea

colNum: Array[String] = Array(LIMIT_BAL, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


In [66]:
for (c <- colNum) {
    val creditDatasetMissingValues = credit_dataset.filter((col(c).isNull and (col(c) <=> lit("")))).count()
    println(s"Column $c has $creditDatasetMissingValues null values")
}

Column LIMIT_BAL has 0 null values
Column AGE has 0 null values
Column PAY_0 has 0 null values
Column PAY_2 has 0 null values
Column PAY_3 has 0 null values
Column PAY_4 has 0 null values
Column PAY_5 has 0 null values
Column PAY_6 has 0 null values
Column BILL_AMT1 has 0 null values
Column BILL_AMT2 has 0 null values
Column BILL_AMT3 has 0 null values
Column BILL_AMT4 has 0 null values
Column BILL_AMT5 has 0 null values
Column BILL_AMT6 has 0 null values
Column PAY_AMT1 has 0 null values
Column PAY_AMT2 has 0 null values
Column PAY_AMT3 has 0 null values
Column PAY_AMT4 has 0 null values
Column PAY_AMT5 has 0 null values
Column PAY_AMT6 has 0 null values


In [67]:
import org.apache.spark.ml.feature.VectorAssembler

val va3 = new VectorAssembler()
    .setInputCols(colNum)
    .setOutputCol("vector_features")

val featuredCredit = va3.transform(ohCredit)

featuredCredit.show(5)

+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|  SEX_vectors|EDUCATION_vectors|MARRIAGE_vectors|     vector_features|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+--------------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|    (2,[],[

import org.apache.spark.ml.feature.VectorAssembler
va3: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_b34173c5e1cb, handleInvalid=error, numInputCols=20
featuredCredit: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 26 more fields]


In [77]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("vector_features")
  .setOutputCol("scaled_features")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(featuredCredit)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(featuredCredit)
scaledData.show(5)


+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+--------------------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|  SEX_vectors|EDUCATION_vectors|MARRIAGE_vectors|     vector_features|     scaled_features|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+--------------------+--------------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0

import org.apache.spark.ml.feature.StandardScaler
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_26609e398b11
scalerModel: org.apache.spark.ml.feature.StandardScalerModel = StandardScalerModel: uid=stdScal_26609e398b11, numFeatures=20, withMean=false, withStd=true
scaledData: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 27 more fields]


In [86]:
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
  .setInputCols(Array("SEX", "EDUCATION", "MARRIAGE"))
  .setOutputCols(Array("SEX_vectors","EDUCATION_vectors","MARRIAGE_vectors"))
val model = encoder.fit(credit_dataset)

val encoded = model.transform(credit_dataset)

encoded.show(5)

+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|  SEX_vectors|EDUCATION_vectors|MARRIAGE_vectors|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-------------+-----------------+----------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|    (2,[],[])|    (6,[2],[1.0])|   (3,[1],[1.0])|
|   120000|  2|        2

import org.apache.spark.ml.feature.OneHotEncoder
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHotEncoder_640cbd5f8cdc
model: org.apache.spark.ml.feature.OneHotEncoderModel = OneHotEncoderModel: uid=oneHotEncoder_640cbd5f8cdc, dropLast=true, handleInvalid=error, numInputCols=3, numOutputCols=3
encoded: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 25 more fields]


In [89]:
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import scala.collection.mutable

val pipeline = new Pipeline().setStages(Array(va3, scaler, encoder))
val newCredit = pipeline.fit(credit_dataset).transform(credit_dataset)

newCredit.show(5)



+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+-------------+-----------------+----------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|     vector_features|     scaled_features|  SEX_vectors|EDUCATION_vectors|MARRIAGE_vectors|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+-------------+-----------------+----------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import scala.collection.mutable
pipeline: org.apache.spark.ml.Pipeline = pipeline_d2f1a9685fb2
newCredit: org.apache.spark.sql.DataFrame = [LIMIT_BAL: int, SEX: int ... 27 more fields]


In [91]:
val va4 = new VectorAssembler().setInputCols(Array("scaled_features","SEX_vectors","EDUCATION_vectors","MARRIAGE_vectors"))
.setOutputCol("features")
val dataCredit = va4.transform(newCredit).select("features", "label")

dataCredit.show(5, false)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                                                                                                                                                                                     |label|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

va4: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_194cffe6ae95, handleInvalid=error, numInputCols=4
dataCredit: org.apache.spark.sql.DataFrame = [features: vector, label: int]


In [93]:
val Array(trainSet, testSet) = dataCredit.randomSplit(Array(0.8, 0.2))

print("Training set:\n")
trainSet.show(5)
print("Test set:\n")
testSet.show(5)

Training set:
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(31,[0,1,2,3,4,5,...|    0|
|(31,[0,1,2,3,4,5,...|    1|
|(31,[0,1,2,3,4,5,...|    0|
|(31,[0,1,2,3,4,5,...|    1|
|(31,[0,1,2,3,4,5,...|    0|
+--------------------+-----+
only showing top 5 rows

Test set:
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(31,[0,1,2,3,4,5,...|    0|
|(31,[0,1,2,3,4,5,...|    0|
|(31,[0,1,2,3,4,5,...|    1|
|(31,[0,1,2,3,4,5,...|    1|
|(31,[0,1,2,3,4,5,...|    0|
+--------------------+-----+
only showing top 5 rows



trainSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: int]
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: int]


In [97]:
import org.apache.spark.ml.regression.LinearRegression

// train the model
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val lrModel = lr.fit(trainSet)
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] Intercept: 0.2216507777220271
RMSE: 0.41535732864281316


import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_8531fd0c15a7
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_8531fd0c15a7, numFeatures=31
trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@6c191784


In [100]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
val ccevaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val ccrmse = ccevaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $ccrmse")


+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|0.2216507777220271|    0|(31,[0,1,2,3,4,5,...|
|0.2216507777220271|    0|(31,[0,1,2,3,4,5,...|
|0.2216507777220271|    1|(31,[0,1,2,3,4,5,...|
|0.2216507777220271|    1|(31,[0,1,2,3,4,5,...|
|0.2216507777220271|    0|(31,[0,1,2,3,4,5,...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.41386507215382207


import org.apache.spark.ml.evaluation.RegressionEvaluator
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: int ... 1 more field]
ccevaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_4fb7cf74849f, metricName=rmse, throughOrigin=false
ccrmse: Double = 0.41386507215382207


In [104]:
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val ccdt = new DecisionTreeRegressor()

// train the model
val ccdtModel = ccdt.fit(trainSet)

// make predictions on the test data
val predictions = ccdtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
|0.09090007930214115|    0|(31,[0,1,2,3,4,5,...|
|0.09090007930214115|    0|(31,[0,1,2,3,4,5,...|
| 0.5789473684210527|    1|(31,[0,1,2,3,4,5,...|
| 0.8252688172043011|    1|(31,[0,1,2,3,4,5,...|
| 0.1837335749886724|    0|(31,[0,1,2,3,4,5,...|
+-------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.364564852236988


import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
ccdt: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_c8ef772cc37d
ccdtModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_c8ef772cc37d, depth=5, numNodes=63, numFeatures=31
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: int ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_5fe56fe0442d, metricName=rmse, throughOrigin=false
rmse: Double = 0.364564852236988


In [106]:
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val ccrf = new RandomForestRegressor()

// train the model
val ccrfModel = ccrf.fit(trainSet)

// make predictions on the test data
val predictions = ccrfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(10)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("label")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
|0.35120903583597685|    0|(31,[0,1,2,3,4,5,...|
| 0.1482163190580732|    0|(31,[0,1,2,3,4,5,...|
| 0.5509490735731037|    1|(31,[0,1,2,3,4,5,...|
|  0.733082563860555|    1|(31,[0,1,2,3,4,5,...|
| 0.2050000424953527|    0|(31,[0,1,2,3,4,5,...|
| 0.1547095197382371|    0|(31,[0,1,2,3,4,5,...|
| 0.2823584261835262|    0|(31,[0,1,2,3,4,5,...|
| 0.7728504315413849|    0|(31,[0,1,2,3,4,5,...|
| 0.7613142314171062|    1|(31,[0,1,2,3,4,5,...|
|0.17025822809067265|    0|(31,[0,1,2,3,4,5,...|
+-------------------+-----+--------------------+
only showing top 10 rows

Root Mean Squared Error (RMSE) on test data = 0.36276734017697326


import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator
ccrf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_0154f66d2c41
ccrfModel: org.apache.spark.ml.regression.RandomForestRegressionModel = RandomForestRegressionModel: uid=rfr_0154f66d2c41, numTrees=20, numFeatures=31
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: int ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_09462fcfae25, metricName=rmse, throughOrigin=false
rmse: Double = 0.36276734017697326
