# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.


---
# 1. Get the data


In [1]:
//# 1. Get the data
//Let's start the lab by loading the dataset. The can find the dataset at `data/ccdefault.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.
val ccdefault = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("ccdefault.csv")

ccdefault = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [3]:
//#2.1 Print the schema of the dataset
ccdefault.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

Print the number of records in the dataset.

In [4]:
//Print the number of records in the dataset.
ccdefault.count()

30000

## 2.2. Look at the data
Print the first five records of the dataset.

In [24]:
//Print the first five records of the dataset.
ccdefault.show(60)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

Print the number of records with population more than 10000.

In [6]:
////Print the number of records with LIMIT_BAL more than 20000.
ccdefault.where("LIMIT_BAL > 20000").show()

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|      1|
|  3|    90000|  2|        2|       2| 34|    0|    0|    0|    0|    0|    0|    29239|    14027|    13559|    14331|    14948|    15549|    1518|    1500|    1000|    1000|    1000|    5000|    

## 2.3. Statistical summary
 You can use the `describe` command.

In [28]:
//Print a summary of the table statistics for the attributes PAY_0 TO PAY_6. You can use the `describe` command.
ccdefault.describe("PAY_0","PAY_2","PAY_3","PAY_4" , "PAY_5" , "PAY_6").show()

+-------+------------------+--------------------+------------------+--------------------+------------------+-----------------+
|summary|             PAY_0|               PAY_2|             PAY_3|               PAY_4|             PAY_5|            PAY_6|
+-------+------------------+--------------------+------------------+--------------------+------------------+-----------------+
|  count|             30000|               30000|             30000|               30000|             30000|            30000|
|   mean|           -0.0167|-0.13376666666666667|           -0.1662|-0.22066666666666668|           -0.2662|          -0.2911|
| stddev|1.1238015279973335|  1.1971859730345495|1.1968675684465686|  1.1691386224023357|1.1331874060027525|1.149987625607897|
|    min|                -2|                  -2|                -2|                  -2|                -2|               -2|
|    max|                 8|                   8|                 8|                   8|                 8|   

Print the maximum, minimum and average LIMIT_BAL

In [2]:
//Print the maximum, minimum and average of the LIMIT_BAL
import org.apache.spark.sql.functions._
ccdefault.select(max("LIMIT_BAL"), min("LIMIT_BAL"), avg("LIMIT_BAL")).show()

+--------------+--------------+------------------+
|max(LIMIT_BAL)|min(LIMIT_BAL)|    avg(LIMIT_BAL)|
+--------------+--------------+------------------+
|       1000000|         10000|167484.32266666667|
+--------------+--------------+------------------+



## 2.4. Breakdown the data by categorical data
Print the number defaulters, and sort them in descending order.

In [9]:
ccdefault.groupBy("DEFAULT").agg(count("DEFAULT") as "NUMBER_OF_DEFAULTERS").orderBy(desc("NUMBER_OF_DEFAULTERS")).show()


+-------+--------------------+
|DEFAULT|NUMBER_OF_DEFAULTERS|
+-------+--------------------+
|      0|               23364|
|      1|                6636|
+-------+--------------------+



// label columns
val colLabel = "label"

// categorical columns
val colCat = "DEFAULT"

// numerical columns
val colNum = renamedccdefault.columns.filter(_ != colLabel).filter(_ != colCat)

In [10]:
//Print the average LIMIT_BAL for different deaulters , and call the new column `avg_value` when print it.
ccdefault.groupBy("DEFAULT").agg(avg("LIMIT_BAL") as "avg_value" ).show()

+-------+------------------+
|DEFAULT|         avg_value|
+-------+------------------+
|      1|130109.65641952984|
|      0|178099.72607430234|
+-------+------------------+



Rewrite the above question in SQL.

In [3]:
//Rewrite the above question in SQL.
import org.apache.spark.sql.SparkSession

ccdefault.createOrReplaceTempView("df")
spark.sql("SELECT DEFAULT, avg(LIMIT_BAL) as avg_value FROM df GROUP BY DEFAULT ORDER BY avg_value DESC").show()

+-------+------------------+
|DEFAULT|         avg_value|
+-------+------------------+
|      0|178099.72607430234|
|      1|130109.65641952984|
+-------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [4]:
import org.apache.spark.ml.feature.VectorAssembler


val va = new VectorAssembler().setInputCols(Array("PAY_0","PAY_2","PAY_3","PAY_4" , "PAY_5" , "PAY_6")).setOutputCol("correlation_vector")

//put vector inside the cells of a column

val ccdefaultAttrs = va.transform(ccdefault)

ccdefaultAttrs.show(5)

//correlation
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val Row(coeff: Matrix) = Correlation.corr(ccdefaultAttrs, "correlation_vector").head

println(s"The standard correlation coefficient:\n ${coeff}")

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|  correlation_vector|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|[2.0,2.0,-1.0,-1....|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|    

va = vecAssembler_d68f76e5988d
ccdefaultAttrs = [ID: int, LIMIT_BAL: int ... 24 more fields]
coeff = 


1.0                 0.6721643825483117  0.5742450926204353  ... (6 total)
0.6721643825483117  1.0                 0.766551682934095   ...
0.5742450926204353  0.766551682934095   1.0                 ...
0.5388406268712332  0.6620671310239535  0.7773588733012698  ...
0.509426063665447   0.6227802453768725  0.6867745109947861  ...
0.4745530860641512  0.5755008617793068  0.6326835927184404  ...


---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `DEFAULT` to `label`.

In [5]:
//Prepare the data for ML algorithms

val renamedccdefault = ccdefault.withColumnRenamed("DEFAULT", "label").toDF()

renamedccdefault.show()

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    1|
|  3|

renamedccdefault = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

In [6]:
val colNum = renamedccdefault.columns

colNum = Array(ID, LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, label)


Array(ID, LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, label)

## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [7]:
//Prepare Continuse attribut


for (c <- (colNum)) {
   
   
    
  println(renamedccdefault.filter(renamedccdefault(c).isNull || renamedccdefault(c) === "" || renamedccdefault(c).isNaN).count()) 
}

 println(renamedccdefault.where("BILL_AMT1==0 & BILL_AMT2==0 & BILL_AMT3==0").count()) 


0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
2008


In [2]:
//Remove 0 values
val renamedccdefault1 = renamedccdefault.drop("ID")
renamedccdefault1.show(50)




+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|
|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    1|
|    90000|  2|        2|

renamedccdefault1 = [LIMIT_BAL: int, SEX: int ... 22 more fields]


[LIMIT_BAL: int, SEX: int ... 22 more fields]

### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [3]:
//scaling
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va = new VectorAssembler().setInputCols(renamedccdefault1.columns)
.setOutputCol("features")
val featuredccdefault = va.transform(renamedccdefault1)

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)
val scaledccdefault = scaler.fit(featuredccdefault).transform(featuredccdefault)

scaledccdefault.show(200)

+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|            features|      scaledFeatures|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|[20000.0,2.0,2.0,...|[0.15414535998894...|
|   120000|  2|        2|       2| 26|   -1|    2|  

va = vecAssembler_c9f4baaa9b25
featuredccdefault = [LIMIT_BAL: int, SEX: int ... 23 more fields]
scaler = stdScal_3b3f5e93d9df
scaledccdefault = [LIMIT_BAL: int, SEX: int ... 24 more fields]


[LIMIT_BAL: int, SEX: int ... 24 more fields]

---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [4]:
//Pipeline
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

val pipeline = new Pipeline().setStages(Array(va, scaler))
val newCcdefault = pipeline.fit(renamedccdefault1).transform(renamedccdefault1)

newCcdefault.show(5)

+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|            features|      scaledFeatures|
+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+--------------------+--------------------+
|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|[20000.0,2.0,2.0,...|[0.15414535998894...|
|   120000|  2|        2|       2| 26|   -1|    2|  

pipeline = pipeline_febad8c647a3
newCcdefault = [LIMIT_BAL: int, SEX: int ... 24 more fields]


[LIMIT_BAL: int, SEX: int ... 24 more fields]

Now, use `VectorAssembler` to put all attributes of the final dataset into a big vector, and call the new column `features`.

In [5]:
//Using vector assembler to put all attributes of the final dataset newHousing 
val va2 = new VectorAssembler().setInputCols(renamedccdefault1.columns).setOutputCol("features2")
val dataset = va2.transform(newCcdefault).select("features2", "label").withColumnRenamed("features2", "features")

dataset.show(50)
println(dataset.select($"label">0).count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[20000.0,2.0,2.0,...|    1|
|[120000.0,2.0,2.0...|    1|
|[90000.0,2.0,2.0,...|    0|
|[50000.0,2.0,2.0,...|    0|
|[50000.0,1.0,2.0,...|    0|
|[50000.0,1.0,1.0,...|    0|
|[500000.0,1.0,1.0...|    0|
|[100000.0,2.0,2.0...|    0|
|[140000.0,2.0,3.0...|    0|
|[20000.0,1.0,3.0,...|    0|
|[200000.0,2.0,3.0...|    0|
|[260000.0,2.0,1.0...|    0|
|[630000.0,2.0,2.0...|    0|
|[70000.0,1.0,2.0,...|    1|
|[250000.0,1.0,1.0...|    0|
|[50000.0,2.0,3.0,...|    0|
|[20000.0,1.0,1.0,...|    1|
|[320000.0,1.0,1.0...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|[130000.0,2.0,3.0...|    0|
|[120000.0,2.0,2.0...|    1|
|[70000.0,2.0,2.0,...|    1|
|[450000.0,2.0,1.0...|    1|
|[90000.0,1.0,1.0,...|    0|
|[50000.0,1.0,3.0,...|    0|
|[60000.0,1.0,1.0,...|    1|
|[50000.0,2.0,3.0,...|    0|
|[50000.0,2.0,3.0,...|    0|
|[50000.0,1.0,1.0,...|    0|
|[230000.0,2.0,1.0...|    0|
|[50000.0,1.0,

va2 = vecAssembler_70060ada9069
dataset = [features: vector, label: int]


[features: vector, label: int]

---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [6]:
//make a model
//train set 
val Array(trainSet, testSet) = dataset.randomSplit(Array(0.8, 0.2), 2)

//trainSet.show(50)
println(trainSet.select($"label">0).count())
trainSet.orderBy(asc("features")).show()
println(testSet.select($"label">0).count())
testSet.orderBy(asc("features")).show()

23977
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
+--------------------+-----+
only showing top 20 rows

6023
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[0,1,2,3,4,5,...|    0|
|(24,[

trainSet = [features: vector, label: int]
testSet = [features: vector, label: int]


[features: vector, label: int]

## 5.1. Logistic regression model


In [7]:
//Binomial logistic Regression
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("label")
.setMaxIter(10).setRegParam(0.01).setElasticNetParam(0.1) //regularization parameter
val lrModel = lr.fit(trainSet)


lr = logreg_ae3cf106bb27
lrModel = logreg_ae3cf106bb27


logreg_ae3cf106bb27

Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [8]:
//use RegressionEvaluator to measure the root-mean-square-error
import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
//val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("label").setPredictionCol("prediction")
val evaluator = new RegressionEvaluator()
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

//Evaluation
// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
val trainingSummary = lrModel.binarySummary
// obtain the objective per iteration. /** objective function (scaled loss->cost function + regularization) at each iteration. */
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))
// obtain the ROC as a dataframe and areaUnderROC.
val roc = trainingSummary.roc
roc.show()
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")
// set the model threshold to maximize F-Measure -> point which optimizes the realation precision and recall
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.0
0.5294855044371531
0.4332997982573783
0.1435960448802347
0.13150053724995764
0.08690485087941405
0.07461761068300517
0.06963817118136036
0.06787711585315337
0.06761537346103143
0.06746782461963818
0.0673434825217259
+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.045078888054094664|
|0.0| 0.08996994740796393|
|0.0|  0.1348610067618332|
|0.0|  0.1797520661157025|
|0.0| 0.22464312546957174|
|0.0|  0.2697220135236664|
|0.0|  0.3146130728775357|
|0.0|   0.359504132231405|
|0.0|  0.4043951915852742

predictions = [features: vector, label: int ... 3 more fields]
evaluator = regEval_78714dc0955d
rmse = 0.0
trainingSummary = org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummaryImpl@326ae8fa
objectiveHistory = Array(0.5294855044371531, 0.4332997982573783, 0.1435960448802347, 0.13150053724995764, 0.08690485087941405, 0.07461761068300517, 0.06963817118136036, 0.06787711585315337, 0.06761537346103143, 0.06746782461963818, 0.0673434825217259)
roc = [FPR: double, TPR: double]


fMeasure: org.apache.spark...


[FPR: double, TPR: double]

## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [9]:
//5.2 Desicion tree regression

import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")

// train the model
val dtModel = dt.fit(trainSet)

// make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("label").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
|       0.0|    0|(24,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.0


dt = dtr_1f9363f10ce3
dtModel = DecisionTreeRegressionModel (uid=dtr_1f9363f10ce3) of depth 1 with 3 nodes
predictions = [features: vector, label: int ... 1 more field]
evaluator = regEval_e0712094fccc
rmse = 0.0


0.0

## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [10]:
//5.3 Random forest regression

import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val rf = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("features").setNumTrees(40)

// train the model
val rfModel = rf.fit(trainSet)

// make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("label").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+--------------------+-----+--------------------+
|          prediction|label|            features|
+--------------------+-----+--------------------+
| 0.06543312590079996|    0|(24,[0,1,2,3,4,5,...|
| 0.06543312590079996|    0|(24,[0,1,2,3,4,5,...|
|0.056722854389869126|    0|(24,[0,1,2,3,4,5,...|
|  0.0592128527234557|    0|(24,[0,1,2,3,4,5,...|
|  0.0592128527234557|    0|(24,[0,1,2,3,4,5,...|
+--------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.06653635362366561


rf = rfr_8c8a126dc6ed
rfModel = RandomForestRegressionModel (uid=rfr_8c8a126dc6ed) with 40 trees
predictions = [features: vector, label: int ... 1 more field]
evaluator = regEval_c84b21b0cdde
rmse = 0.06653635362366561


0.06653635362366561

## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [16]:
//5.4 Gradient-boosted tree regression

import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val gb = new GBTRegressor().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setFeatureSubsetStrategy("auto")

// train the model
val gbModel = gb.fit(trainSet)

// make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("label").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    0|(25,[0,1,2,3,4,5,...|
|       0.0|    0|(25,[0,1,2,3,4,5,...|
|       0.0|    0|(25,[0,1,2,3,4,5,...|
|       0.0|    0|(25,[0,1,2,3,4,5,...|
|       0.0|    0|(25,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.0


gb = gbtr_14cf7f4c9f63
gbModel = GBTRegressionModel (uid=gbtr_14cf7f4c9f63) with 10 trees
predictions = [features: vector, label: int ... 1 more field]
evaluator = regEval_47897c639727
rmse = 0.0


0.0

---
# 6. Hyperparameter tuning
An important task in Machie Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [11]:
//Hyperparameter tunning

import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator

val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(1, 5, 10)).addGrid(rf.numTrees, Array(5, 10, 15)).build()

val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val cv = new CrossValidator().setEstimator(rf).setEvaluator(new RegressionEvaluator()).setEstimatorParamMaps(paramGrid).setNumFolds(3)
val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val finalP = cvModel.bestModel.transform(trainSet)
finalP.show()

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+--------------------+-----+--------------------+
|          prediction|label|            features|
+--------------------+-----+--------------------+
|                 0.0|    0|(24,[0,1,2,3,4,5,...|
|                 0.0|    0|(24,[0,1,2,3,4,5,...|
|0.001360544217687...|    0|(24,[0,1,2,3,4,5,...|
|                 0.0|    0|(24,[0,1,2,3,4,5,...|
|                 0.0|    0|(24,[0,1,2,3,4,5,...|
+--------------------+-----+--------------------+
only showing top 5 rows

+--------------------+-----+--------------------+
|            features|label|          prediction|
+--------------------+-----+--------------------+
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|    0|                 0.0|
|(24,[0,1,2,3,4,5,...|   

paramGrid = 


Array({
	rfr_8c8a126dc6ed-maxDepth: 1,
	rfr_8c8a126dc6ed-numTrees: 5
}, {
	rfr_8c8a126dc6ed-maxDepth: 1,
	rfr_8c8a126dc6ed-numTrees: 10
}, {
	rfr_8c8a126dc6ed-maxDepth: 1,
	rfr_8c8a126dc6ed-numTrees: 15
}, {
	rfr_8c8a126dc6ed-maxDepth: 5,
	rfr_8c8a126dc6ed-numTrees: 5
}, {
	rfr_8c8a126dc6ed-maxDepth: 5,
	rfr_8c8a126dc6ed-numTrees: 10
}, {
	rfr_8c8a126dc6ed-maxDepth: 5,
	rfr_8c8a126dc6ed-numTrees: 15
}, {
	rfr_8c8a126dc6ed-maxDepth: 10,
	rfr_8c8a126dc6ed-numTrees: 5
}, {
	rfr_8c8a126dc6ed-maxDepth: 10,
	rfr_8c8a126dc6ed-numTrees: 10
}, {
	rfr_8c8a126dc6ed-maxDepth: 10,
	rfr_8c8a...


---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of ouput column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. The details of the implemeting a custom Tranfomer is explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before starting to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [18]:
import org.apache.spark.sql.types.{StructField, StructType, DoubleType, StringType, IntegerType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

trait MyParams extends Params {
    final val inputCol1 = new Param[String](this, "LIMIT_BAL", "The input column")
    final val inputCol2 = new Param[String](this, "BILL_AMT1", "The input column2")
    final val outputCol = new Param[String](this, "REMAINING_AMT", "The input column3")
    
  protected def validateAndTransformSchema(schema: StructType): StructType = {
    val idx = schema.fieldIndex($(inputCol1))
    val idx2 = schema.fieldIndex($(inputCol2))
    val field = schema.fields(idx)
    val field2 = schema.fields(idx2)
    if (field.dataType != StringType) {
      throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    }
    if (field2.dataType != StringType) {
      throw new Exception(s"Input type ${field2.dataType} did not match input type StringType")
    }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
  }
}


defined trait MyParams


Then, extend the class `Transformer`, and implement its setter functions for the input and output columns, and call then `setInputCol1`, `setInputCol2`, and `setOutputCol`. Morever, you need to override the methods `copy`, `transformSchema`, and the `transform`. The details of what you need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [19]:
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{col, udf}

class MyTransformer(override val uid: String) extends Transformer with MyParams {
    def this() = this(Identifiable.randomUID("configurablewordcount"))
    
    def setInputCol1(value: String) = set(inputCol1, value)
    
    def setInputCol2(value: String) = set(inputCol2, value)
    
    def setOutputCol(value: String) = set(outputCol, value)

    override def copy(extra: ParamMap): MyTransformer = {
    defaultCopy(extra)
    }

    override def transformSchema(schema: StructType): StructType = {
        val idx = schema.fieldIndex($(inputCol1))
        val idx2 = schema.fieldIndex($(inputCol2))
        val field = schema.fields(idx)
        val field2 = schema.fields(idx2)

        if (field.dataType != StringType) {
          throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
        }
        if (field2.dataType != StringType) {
          throw new Exception(s"Input type ${field2.dataType} did not match input type StringType")
        }
        // Add the return field
        schema.add(StructField($(outputCol), IntegerType, false))
  }
    
    def sub = udf((a: Double, b: Double) => a-b)
    
    override def transform(dataset: Dataset[_]): DataFrame = {
    newCcdefault.withColumn($(outputCol),sub(col("LIMIT_BAL"), col("BILL_AMT1")))
    }
}

defined class MyTransformer


Now, an instance of `MyTransformer`, and set the input columns `total_rooms` and `households`, and the output column `rooms_per_household` and run it over the `housing` dataset.

In [20]:
val myTransformer = new MyTransformer().setInputCol1("LIMIT_BAL").setInputCol2("BILL_AMT1").setOutputCol("REMAINING_AMT")

val myDataset = myTransformer.transform(ccdefault).select("REMAINING_AMT").show(5)

+-------------+
|REMAINING_AMT|
+-------------+
|      16087.0|
|     117318.0|
|      60761.0|
|       3010.0|
|      41383.0|
+-------------+
only showing top 5 rows



myTransformer = configurablewordcount_e41524218f52


myDataset: Unit = ()


configurablewordcount_e41524218f52

---
# 8. Custom estimator (predictor)
Now, it's time to implement your own linear regression with gradient descent algorithm as a brand new Estimator. The whole code of the Estimator is given to you, and you do not need to implement anything. It is just a sample that shows how to build a custom Estimator.

The gradient descent update for linear regression is:
$$
w_{i+1} = w_{i} - \alpha_{i} \sum\limits_{j=1}^n (w_i^\top x_j - y_j)x_j
$$

where $i$ is the iteration number of the gradient descent algorithm, and $j$ identifies the observation. Here, $w$ represents an array of weights that is the same size as the array of features and provides a weight for each of the features when finally computing the label prediction in the form:

$$
prediction = w^\top \cdot\ x
$$

where $w$ is the final array of weights computed by the gradient descent, $x$ is the array of features of the observation point and $prediction$ is the label we predict should be associated to this observation.

The given `Helper` class implements the helper methods:
* `dot`: implements the dot product of two vectors and the dot product of a vector and a scalar
* `sum`: implements addition of two vectors
* `fill`: creates a vector of predefined size and initialize it with the predefined value

What you need to do is to implement the methods of the Linear Regresstion class `LR`, which are
* `rmsd`: computes the Root Mean Square Error of a given RDD of tuples of (label, prediction) using the formula:
$$
rmse = \sqrt{\frac{\sum\limits_{i=1}^n (label - prediction)^2}{n}}
$$
* `gradientSummand`: computes the following formula:
$$
gs_{ij} = (w_i^\top x_j - y_j)x_j
$$
* `gradient`: computes the following formula:
$$
gradient = \sum\limits_{j=1}^n gs_{ij}
$$

In [None]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

case class Instance(label: Double, features: Vector)

object Helper extends Serializable {
  def dot(v1: Vector, v2: Vector): Double = {
    val m = Matrices.dense(1, v1.size, v1.toArray)
    m.multiply(v2).values(0)
  }

  def dot(v: Vector, s: Double): Vector = {
    val baseArray = v.toArray.map(vi => vi * s)
    Vectors.dense(baseArray)
  }

  def sumVectors(v1: Vector, v2: Vector): Vector = {
    val baseArray = ((v1.toArray) zip (v2.toArray)).map { case (val1, val2) => val1 + val2 }
    Vectors.dense(baseArray)
  }

  def fillVector(size: Int, fillVal: Double): Vector = Vectors.dense(Array.fill[Double](size)(fillVal));
}

In [None]:
class LR() extends Serializable {
  def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
    val regressionMetrics = new RegressionMetrics(labelsAndPreds)
    regressionMetrics.rootMeanSquaredError
  }
  
  def gradientSummand(weights: Vector, lp: Instance): Vector = {
    val mult = (Helper.dot(weights, lp.features) - lp.label)
    val seq = (0 to lp.features.size - 1).map(i => lp.features(i) * mult)
    return Vectors.dense(seq.toArray)
  }
  
  def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
    val n = trainData.count()
    val d = trainData.take(1)(0).features.size
    var w = Helper.fillVector(d, 0)
    val alpha = 1.0
    val errorTrain = Array.fill[Double](numIters)(0.0)

    for (i <- 0 until numIters) {
      val labelsAndPredsTrain = trainData.map(lp => (lp.label, Helper.dot(w, lp.features)))
      errorTrain(i) = calcRMSE(labelsAndPredsTrain)

      val gradient = trainData.map(lp => gradientSummand(w, lp)).reduce((v1, v2) => Helper.sumVectors(v1, v2))
      val alpha_i = alpha / (n * scala.math.sqrt(i + 1))
      val wAux = Helper.dot(gradient, (-1) * alpha_i)
      w = Helper.sumVectors(w, wAux)
    }
    (w, errorTrain)
  }
}

In [None]:
abstract class MyLinearModel[FeaturesType, Model <: MyLinearModel[FeaturesType, Model]]
  extends PredictionModel[FeaturesType, Model] {
}

class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
    extends MyLinearModel[Vector, MyLinearModelImpl] {

  override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)

  def predict(features: Vector): Double = {
    println("Predicting")
    val prediction = Helper.dot(weights, features)
    prediction
  }
}

In [None]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

abstract class MyLinearRegression[
    FeaturesType,
    Learner <: MyLinearRegression[FeaturesType, Learner, Model],
    Model <: MyLinearModel[FeaturesType, Model]]
  extends Predictor[FeaturesType, Learner, Model] {
}

class MyLinearRegressionImpl(override val uid: String)
    extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
  def this() = this(Identifiable.randomUID("linReg"))

  override def copy(extra: ParamMap): MyLinearRegressionImpl = defaultCopy(extra)
  
  def train(dataset: Dataset[_]): MyLinearModelImpl = {
    println("Training")

    val numIters = 10

    val instances: RDD[Instance] = dataset.select(
      col($(labelCol)), col($(featuresCol))).rdd.map {
        case Row(label: Double, features: Vector) =>
          Instance(label, features)
      }

    val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)

    new MyLinearModelImpl(uid, weights, trainingError)
  }
}

In [None]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val lr = new MyLinearRegressionImpl().setLabelCol("label").setFeaturesCol("features")
val model = lr.fit(trainSet)
val predictions = model.transform(trainSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")