# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Lab 1").getOrCreate()

filename = "data/housing.csv"

housing = spark.read.csv(filename,header=True,inferSchema=True)

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [2]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [3]:
housing.count()

20640

## 2.2. Look at the data
Print the first five records of the dataset.

In [4]:
housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Print the number of records with population more than 10000.

In [5]:
housing.where(housing.population>10000).count()

23

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [6]:
housing.describe(['housing_median_age','total_rooms','median_house_value','population']).show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [7]:
from pyspark.sql.functions import col,max,min,avg

max_housing_median_age = housing.select(max(col('housing_median_age')).alias('max_housing_median_age')).collect()[0].asDict()['max_housing_median_age']
min_total_rooms = housing.select(min(col('total_rooms')).alias('min_total_rooms')).collect()[0].asDict()['min_total_rooms']
avg_median_house_value = housing.select(avg(col('median_house_value')).alias('avg_median_house_value')).collect()[0].asDict()['avg_median_house_value']

print(f"max={max_housing_median_age} min={min_total_rooms} avg={avg_median_house_value}")

max=52.0 min=2.0 avg=206855.81690891474


## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [8]:
from pyspark.sql.functions import desc

housing.groupBy("ocean_proximity").count().sort(desc("count")).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [9]:
housing.groupBy("ocean_proximity").agg(avg("median_house_value").alias("avg_value")).show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [10]:
housing.createOrReplaceTempView("df")
spark.sql("SELECT ocean_proximity, AVG(median_house_value) AS avg_value FROM df GROUP BY ocean_proximity").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [11]:
import numpy as np
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=['housing_median_age', 'total_rooms', 'median_house_value', 'population'],outputCol='attributes')
housing_assembled = va.transform(housing)
housing_assembled.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|          attributes|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

In [12]:
coeff = Correlation.corr(housing_assembled, "attributes").head()
print("Pearson correlation matrix:\n" + str(coeff[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        , -0.3612622 ,  0.10562341, -0.29624424],
             [-0.3612622 ,  1.        ,  0.13415311,  0.85712597],
             [ 0.10562341,  0.13415311,  1.        , -0.02464968],
             [-0.29624424,  0.85712597, -0.02464968,  1.        ]])




## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [13]:
housingCol1 = housing.withColumn('rooms_per_household', housing.total_rooms / housing.households)
housingCol2 = housingCol1.withColumn('bedrooms_per_room', housingCol1.total_bedrooms / housingCol1.total_rooms)
housingExtra = housingCol2.withColumn('population_per_household', housingCol2.population / housingCol2.households)

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [14]:
renamedHousing = housingExtra.withColumnRenamed("median_house_value","label")
renamedHousing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)



Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, since we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [16]:
# // label columns
# val colLabel = "label"

# // categorical columns
# val colCat = "ocean_proximity"

# // numerical columns
# val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel = renamedHousing.select("label")
colCat = renamedHousing.select("ocean_proximity")
colNum = renamedHousing.drop("label").drop("ocean_proximity")

print("Categorical:")
colCat.printSchema()

print("Numerical:")
colNum.printSchema()

print("Labels:")
colLabel.printSchema()

Categorical:
root
 |-- ocean_proximity: string (nullable = true)

Numerical:
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)

Labels:
root
 |-- label: double (nullable = true)



## 3.1. Prepare continuous attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [17]:
from pyspark.sql.functions import col,sum
colNum.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in colNum.columns)).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                  0|              207|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+



As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [18]:
# // TODO: Replace <FILL IN> with appropriate code

# import org.apache.spark.ml.feature.Imputer

# val imputer = new Imputer().setStrategy("median").<FILL IN>                                  
# val imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)

# imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['total_bedrooms','bedrooms_per_room'], 
    outputCols=['total_bedrooms','bedrooms_per_room']
    ).setStrategy("median")

imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)
imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



In [19]:
imputedHousing.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in imputedHousing.columns)).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|label|ocean_proximity|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|    0|              0|                  0|                0|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-----+---------------+-------------------+-----------------+------------------------+



### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [67]:
from pyspark.ml.feature import VectorAssembler,StandardScaler

va = VectorAssembler(inputCols=colNum.schema.names,outputCol='assembledNumFeatures')
featuredHousing = va.transform(imputedHousing)

scaler = StandardScaler(inputCol="assembledNumFeatures", outputCol="scaledAssembledNumFeatures", withStd=True, withMean=True)
scaledHousing = scaler.fit(featuredHousing).transform(featuredHousing)
# .drop("assembledNumFeatures").withColumnRenamed("scaledFeatures","features")
scaledHousing.select("assembledNumFeatures","scaledAssembledNumFeatures").show(5)

+--------------------+--------------------------+
|assembledNumFeatures|scaledAssembledNumFeatures|
+--------------------+--------------------------+
|[-122.23,37.88,41...|      [-1.3278030546902...|
|[-122.22,37.86,21...|      [-1.3228118684350...|
|[-122.24,37.85,52...|      [-1.3327942409452...|
|[-122.25,37.85,52...|      [-1.3377854272003...|
|[-122.25,37.85,52...|      [-1.3377854272003...|
+--------------------+--------------------------+
only showing top 5 rows



## 3.2. Prepare categorical attributes
After imputing and scaling the continuous attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

In [21]:
# [l["ocean_proximity"] for l in renamedHousing.select("ocean_proximity").distinct().collect()]

scaledHousing.groupBy("ocean_proximity").count().sort(desc("count")).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [22]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="indexed_ocean_proximity", stringOrderType="frequencyDesc")
idxHousing = indexer.fit(scaledHousing).transform(scaledHousing)
# .drop("ocean_proximity").withColumnRenamed("indexedOceanProximity","ocean_proximity")
idxHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|assembledNumFeatures|scaledAssembledNumFeatures|indexed_ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-122.23,37.8

Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [23]:
indexer.fit(renamedHousing).labels

['<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'NEAR BAY', 'ISLAND']

### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [27]:
# // TODO: Replace <FILL IN> with appropriate code

# import org.apache.spark.ml.feature.OneHotEncoderEstimator

# val encoder = new OneHotEncoderEstimator().<FILL IN>
# val ohHousing = encoder.fit(idxHousing).transform(idxHousing)

# ohHousing.show(5)

from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="indexed_ocean_proximity", outputCol="ocean_proximity_onehot")
# ohHousing = encoder.transform(idxHousing)
# # .drop("ocean_proximity").withColumnRenamed("oh_ocean_proximity","ocean_proximity")
# ohHousing.show(5)

# encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
#                         outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(idxHousing)
ohHousing = model.transform(idxHousing)
ohHousing.show(5)

# from pyspark.ml.feature import OneHotEncoderEstimator
# encoder = OneHotEncoderEstimator(inputCols=["indexed_ocean_proximity"], outputCols=["features"])
# ohHousing = ohe.fit(idxHousing).transform(idxHousing)
# ohHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|assembledNumFeatures|scaledAssembledNumFeatures|indexed_ocean_proximity|ocean_proximity_onehot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.98412

In [29]:
ohHousing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)
 |-- assembledNumFeatures: vector (nullable = true)
 |-- scaledAssembledNumFeatures: vector (nullable = true)
 |-- indexed_ocean_proximity: double (nullable = false)
 |-- ocean_proximity_onehot: vector (nullable = true)



---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [26]:
renamedHousing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)



In [30]:
# // TODO: Replace <FILL IN> with appropriate code

# import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

# val numPipeline = <FILL IN>
# val catPipeline = <FILL IN>
# val pipeline = new Pipeline().setStages(Array(numPipeline, catPipeline))
# val newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)

# newHousing.show(5)

from pyspark.ml import Pipeline

numPipeline = Pipeline(stages=[imputer, va, scaler])
catPipeline = Pipeline(stages=[indexer, encoder])

pipeline = Pipeline(stages=[numPipeline, catPipeline])

newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)

newHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|assembledNumFeatures|scaledAssembledNumFeatures|indexed_ocean_proximity|ocean_proximity_onehot|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------------+-----------------------+----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.98412

In [31]:
newHousing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)
 |-- population_per_household: double (nullable = true)
 |-- assembledNumFeatures: vector (nullable = true)
 |-- scaledAssembledNumFeatures: vector (nullable = true)
 |-- indexed_ocean_proximity: double (nullable = false)
 |-- ocean_proximity_onehot: vector (nullable = true)



Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [32]:
va2 = VectorAssembler(inputCols=['scaledAssembledNumFeatures', 'ocean_proximity_onehot'],outputCol='features')
dataset = va2.transform(newHousing).select("features", "label")

dataset.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-1.3278030546902...|452600.0|
|[-1.3228118684350...|358500.0|
|[-1.3327942409452...|352100.0|
|[-1.3377854272003...|341300.0|
|[-1.3377854272003...|342200.0|
+--------------------+--------+
only showing top 5 rows



---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [33]:
trainSet, testSet = dataset.randomSplit([0.8,0.2], seed=None)

## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [37]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression()

lrModel = lr.fit(trainSet)
trainingSummary = lrModel.summary

print(f"numIterations: {trainingSummary.totalIterations}")   ## CHECK WHY ONLY 1 ITERTION !!!!!
print(f"objectiveHistory: {trainingSummary.objectiveHistory}")
trainingSummary.residuals.show()
print(f"Train RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"Train r2: {trainingSummary.r2}")

print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

numIterations: 0
objectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
|-107229.14641356701|
|-20805.540958396043|
| -41280.62814564741|
| -100419.9955033815|
| -70136.91864769114|
|-107296.86110512674|
|-101841.77386634814|
|-157582.15175529965|
|-44337.852193730476|
| -42645.41607461218|
|  -86968.2189656446|
| -43496.45007687865|
|-54945.826443584985|
|-103096.89369958764|
|-102065.85526528902|
| -75885.17643575219|
|-22563.653938805975|
| -74073.92679707694|
| -83549.25741300103|
| -82274.79235590465|
+-------------------+
only showing top 20 rows

Train RMSE: 67603.46298975957
Train r2: 0.6544407103615614
Coefficients: [-53413.97688102941,-53717.2305857711,13454.017707220415,6298.134352496961,2564.6018155392085,-45291.00283250605,41820.80235295144,78318.08912884528,6301.655136412218,16026.465291655324,17.616291556954103,-121331.44144643401,-156532.0924733863,-116428.59913364051,-125280.9469876201]
Intercept: 338721.41988067946


Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [38]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|135727.72278051544| 68300.0|[-2.2960931881793...|
|162923.34038962072|109400.0|[-2.2960931881793...|
|157121.70868038066| 76900.0|[-2.2911020019242...|
|104680.82836256246| 69500.0|[-2.2911020019242...|
|130393.19926924375| 66800.0|[-2.2811196294140...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 69076.2


## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [39]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

dt = DecisionTreeRegressor()
dtModel = dt.fit(trainSet)

predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|136009.52380952382| 68300.0|[-2.2960931881793...|
| 201717.3605059733|109400.0|[-2.2960931881793...|
|136009.52380952382| 76900.0|[-2.2911020019242...|
|143479.76205583755| 69500.0|[-2.2911020019242...|
|136009.52380952382| 66800.0|[-2.2811196294140...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 68730.8


## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [40]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor()
rfModel = rf.fit(trainSet)

predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 151654.6878749982| 68300.0|[-2.2960931881793...|
|204972.55615037243|109400.0|[-2.2960931881793...|
| 154127.4033989689| 76900.0|[-2.2911020019242...|
|143084.80998519008| 69500.0|[-2.2911020019242...|
| 154127.4033989689| 66800.0|[-2.2811196294140...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 67073.7


## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [41]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

gb = GBTRegressor()
gbModel = gb.fit(trainSet)

predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 83329.14239377546| 68300.0|[-2.2960931881793...|
|133873.50650821248|109400.0|[-2.2960931881793...|
| 85989.05465711399| 76900.0|[-2.2911020019242...|
| 96436.42587214964| 69500.0|[-2.2911020019242...|
| 93776.10577926389| 66800.0|[-2.2811196294140...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 56504.1


---
# 6. Hyperparameter tuning
An important task in Machine Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [42]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor()

paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [1,5,10]).addGrid(rf.maxDepth, [5,10,15]).build()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, parallelism=2)
cvModel = cv.fit(trainSet)

predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+-----------------+--------+--------------------+
|       prediction|   label|            features|
+-----------------+--------+--------------------+
|88087.27777777778| 68300.0|[-2.2960931881793...|
|138805.1666666667|109400.0|[-2.2960931881793...|
|78888.40196078431| 76900.0|[-2.2911020019242...|
|81690.33333333333| 69500.0|[-2.2911020019242...|
|79671.67857142857| 66800.0|[-2.2811196294140...|
+-----------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 53798.9


---
# 7. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

## 1. Load the data

In [83]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Lab 1").getOrCreate()

filename = "data/ccdefault.csv"

df = spark.read.csv(filename,header=True,inferSchema=True)

## 2. Exploratory Data Analysis

### Schema

In [84]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

### Show Dataframe

In [85]:
# col 'LIMIT_BAL' is money (first column in the dataframe)
nonMoneyFeatures = ['SEX','EDUCATION','MARRIAGE','AGE']
pastPayments = ['PAY_0','PAY_2','PAY_3','PAY_4','PAY_5','PAY_6']
billStatement = ['BILL_AMT1','BILL_AMT2','BILL_AMT3','BILL_AMT4','BILL_AMT5','BILL_AMT6']
prevPayment = ['PAY_AMT1','PAY_AMT2','PAY_AMT3','PAY_AMT4','PAY_AMT5','PAY_AMT6']

#### Non money-related attributes

In [86]:
df.select(['LIMIT_BAL']+nonMoneyFeatures).show(5)

+---------+---+---------+--------+---+
|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|
+---------+---+---------+--------+---+
|    20000|  2|        2|       1| 24|
|   120000|  2|        2|       2| 26|
|    90000|  2|        2|       2| 34|
|    50000|  2|        2|       1| 37|
|    50000|  1|        2|       1| 57|
+---------+---+---------+--------+---+
only showing top 5 rows



#### History of past payment

In [87]:
df.select(pastPayments).show(5)

+-----+-----+-----+-----+-----+-----+
|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|
+-----+-----+-----+-----+-----+-----+
|    2|    2|   -1|   -1|   -2|   -2|
|   -1|    2|    0|    0|    0|    2|
|    0|    0|    0|    0|    0|    0|
|    0|    0|    0|    0|    0|    0|
|   -1|    0|   -1|    0|    0|    0|
+-----+-----+-----+-----+-----+-----+
only showing top 5 rows



#### Amount of bill statement (NT dollar)

In [88]:
df.select(billStatement).show(5)

+---------+---------+---------+---------+---------+---------+
|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|
+---------+---------+---------+---------+---------+---------+
|     3913|     3102|      689|        0|        0|        0|
|     2682|     1725|     2682|     3272|     3455|     3261|
|    29239|    14027|    13559|    14331|    14948|    15549|
|    46990|    48233|    49291|    28314|    28959|    29547|
|     8617|     5670|    35835|    20940|    19146|    19131|
+---------+---------+---------+---------+---------+---------+
only showing top 5 rows



#### Amount of previous payment (NT dollar)

In [89]:
df.select(prevPayment).show(5)

+--------+--------+--------+--------+--------+--------+
|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|
+--------+--------+--------+--------+--------+--------+
|       0|     689|       0|       0|       0|       0|
|       0|    1000|    1000|    1000|       0|    2000|
|    1518|    1500|    1000|    1000|    1000|    5000|
|    2000|    2019|    1200|    1100|    1069|    1000|
|    2000|   36681|   10000|    9000|     689|     679|
+--------+--------+--------+--------+--------+--------+
only showing top 5 rows



### Count

In [90]:
df.count()

30000

### Statistical Properties of PySpark Dataframe

#### Non money-related attributes

In [91]:
df.describe(nonMoneyFeatures).show()

+-------+------------------+------------------+------------------+-----------------+
|summary|               SEX|         EDUCATION|          MARRIAGE|              AGE|
+-------+------------------+------------------+------------------+-----------------+
|  count|             30000|             30000|             30000|            30000|
|   mean|1.6037333333333332|1.8531333333333333|1.5518666666666667|          35.4855|
| stddev|0.4891291960902602|0.7903486597207269|0.5219696006132467|9.217904068090155|
|    min|                 1|                 0|                 0|               21|
|    max|                 2|                 6|                 3|               79|
+-------+------------------+------------------+------------------+-----------------+



#### History of past payments

In [92]:
df.describe(pastPayments).show()

+-------+------------------+--------------------+------------------+--------------------+------------------+-----------------+
|summary|             PAY_0|               PAY_2|             PAY_3|               PAY_4|             PAY_5|            PAY_6|
+-------+------------------+--------------------+------------------+--------------------+------------------+-----------------+
|  count|             30000|               30000|             30000|               30000|             30000|            30000|
|   mean|           -0.0167|-0.13376666666666667|           -0.1662|-0.22066666666666668|           -0.2662|          -0.2911|
| stddev|1.1238015279973335|  1.1971859730345495|1.1968675684465686|  1.1691386224023357|1.1331874060027525|1.149987625607897|
|    min|                -2|                  -2|                -2|                  -2|                -2|               -2|
|    max|                 8|                   8|                 8|                   8|                 8|   

#### Amount of bill statement (NT dollar)

In [93]:
df.describe(billStatement).show()

+-------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+
|summary|        BILL_AMT1|        BILL_AMT2|        BILL_AMT3|         BILL_AMT4|        BILL_AMT5|       BILL_AMT6|
+-------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+
|  count|            30000|            30000|            30000|             30000|            30000|           30000|
|   mean|       51223.3309|49179.07516666667|       47013.1548| 43262.94896666666|40311.40096666667|      38871.7604|
| stddev|73635.86057552966|71173.76878252832|69349.38742703677|64332.856133916444|60797.15577026471|59554.1075367459|
|    min|          -165580|           -69777|          -157264|           -170000|           -81334|         -339603|
|    max|           964511|           983931|          1664089|            891586|           927171|          961664|
+-------+-----------------+-----------------+-----------

#### Amount of previous payment (NT dollar)

In [94]:
df.describe(prevPayment).show()

+-------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|         PAY_AMT1|          PAY_AMT2|         PAY_AMT3|          PAY_AMT4|          PAY_AMT5|         PAY_AMT6|
+-------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|            30000|             30000|            30000|             30000|             30000|            30000|
|   mean|        5663.5805|         5921.1635|        5225.6815| 4826.076866666666| 4799.387633333334|5215.502566666667|
| stddev|16563.28035402577|23040.870402057186|17606.96146980311|15666.159744032062|15278.305679144742|17777.46577543531|
|    min|                0|                 0|                0|                 0|                 0|                0|
|    max|           873552|           1684259|           896040|            621000|            426529|           528666|
+-------+-----------------+-----

### Count the missing values in a column of PySpark Dataframe

In [95]:
for col in df.columns:
    print(col, "\t", "with null values: ", df.filter(df[col].isNull()).count())

ID 	 with null values:  0
LIMIT_BAL 	 with null values:  0
SEX 	 with null values:  0
EDUCATION 	 with null values:  0
MARRIAGE 	 with null values:  0
AGE 	 with null values:  0
PAY_0 	 with null values:  0
PAY_2 	 with null values:  0
PAY_3 	 with null values:  0
PAY_4 	 with null values:  0
PAY_5 	 with null values:  0
PAY_6 	 with null values:  0
BILL_AMT1 	 with null values:  0
BILL_AMT2 	 with null values:  0
BILL_AMT3 	 with null values:  0
BILL_AMT4 	 with null values:  0
BILL_AMT5 	 with null values:  0
BILL_AMT6 	 with null values:  0
PAY_AMT1 	 with null values:  0
PAY_AMT2 	 with null values:  0
PAY_AMT3 	 with null values:  0
PAY_AMT4 	 with null values:  0
PAY_AMT5 	 with null values:  0
PAY_AMT6 	 with null values:  0
DEFAULT 	 with null values:  0


### Checking for correlation

#### Among the non money-related columns

In [96]:
import numpy as np
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=nonMoneyFeatures,outputCol='attributes')
df_assembled = va.transform(df)
df_assembled.select(nonMoneyFeatures + ['attributes']).show(5)

coeff = Correlation.corr(df_assembled, "attributes").head()
print("Pearson correlation matrix:\n" + str(coeff[0]))

+---+---------+--------+---+------------------+
|SEX|EDUCATION|MARRIAGE|AGE|        attributes|
+---+---------+--------+---+------------------+
|  2|        2|       1| 24|[2.0,2.0,1.0,24.0]|
|  2|        2|       2| 26|[2.0,2.0,2.0,26.0]|
|  2|        2|       2| 34|[2.0,2.0,2.0,34.0]|
|  2|        2|       1| 37|[2.0,2.0,1.0,37.0]|
|  1|        2|       1| 57|[1.0,2.0,1.0,57.0]|
+---+---------+--------+---+------------------+
only showing top 5 rows

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.01423194, -0.03138884, -0.09087365],
             [ 0.01423194,  1.        , -0.14346434,  0.17506066],
             [-0.03138884, -0.14346434,  1.        , -0.41416992],
             [-0.09087365,  0.17506066, -0.41416992,  1.        ]])


#### Among history of past payments (* alarming state of correlation)

In [97]:
import numpy as np
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=pastPayments,outputCol='attributes')
df_assembled = va.transform(df)
df_assembled.select(pastPayments + ['attributes']).show(5)

coeff = Correlation.corr(df_assembled, "attributes").head()
print("Pearson correlation matrix:\n" + str(coeff[0]))

+-----+-----+-----+-----+-----+-----+--------------------+
|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|          attributes|
+-----+-----+-----+-----+-----+-----+--------------------+
|    2|    2|   -1|   -1|   -2|   -2|[2.0,2.0,-1.0,-1....|
|   -1|    2|    0|    0|    0|    2|[-1.0,2.0,0.0,0.0...|
|    0|    0|    0|    0|    0|    0|           (6,[],[])|
|    0|    0|    0|    0|    0|    0|           (6,[],[])|
|   -1|    0|   -1|    0|    0|    0|(6,[0,2],[-1.0,-1...|
+-----+-----+-----+-----+-----+-----+--------------------+
only showing top 5 rows

Pearson correlation matrix:
DenseMatrix([[1.        , 0.67216438, 0.57424509, 0.53884063, 0.50942606,
              0.47455309],
             [0.67216438, 1.        , 0.76655168, 0.66206713, 0.62278025,
              0.57550086],
             [0.57424509, 0.76655168, 1.        , 0.77735887, 0.68677451,
              0.63268359],
             [0.53884063, 0.66206713, 0.77735887, 1.        , 0.81983531,
              0.71644948],
            

##### Among bill statements (*more alarming state of correlation)

In [98]:
import numpy as np
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=billStatement,outputCol='attributes')
df_assembled = va.transform(df)
df_assembled.select(billStatement + ['attributes']).show(5)

coeff = Correlation.corr(df_assembled, "attributes").head()
print("Pearson correlation matrix:\n" + str(coeff[0]))

+---------+---------+---------+---------+---------+---------+--------------------+
|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|          attributes|
+---------+---------+---------+---------+---------+---------+--------------------+
|     3913|     3102|      689|        0|        0|        0|[3913.0,3102.0,68...|
|     2682|     1725|     2682|     3272|     3455|     3261|[2682.0,1725.0,26...|
|    29239|    14027|    13559|    14331|    14948|    15549|[29239.0,14027.0,...|
|    46990|    48233|    49291|    28314|    28959|    29547|[46990.0,48233.0,...|
|     8617|     5670|    35835|    20940|    19146|    19131|[8617.0,5670.0,35...|
+---------+---------+---------+---------+---------+---------+--------------------+
only showing top 5 rows

Pearson correlation matrix:
DenseMatrix([[1.        , 0.95148367, 0.89227853, 0.86027219, 0.82977861,
              0.80265019],
             [0.95148367, 1.        , 0.92832626, 0.89248229, 0.85977831,
              0.83159356]

#### Among previous payments

In [99]:
import numpy as np
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=prevPayment,outputCol='attributes')
df_assembled = va.transform(df)
df_assembled.select(prevPayment + ['attributes']).show(5)

coeff = Correlation.corr(df_assembled, "attributes").head()
print("Pearson correlation matrix:\n" + str(coeff[0]))

+--------+--------+--------+--------+--------+--------+--------------------+
|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|          attributes|
+--------+--------+--------+--------+--------+--------+--------------------+
|       0|     689|       0|       0|       0|       0|     (6,[1],[689.0])|
|       0|    1000|    1000|    1000|       0|    2000|[0.0,1000.0,1000....|
|    1518|    1500|    1000|    1000|    1000|    5000|[1518.0,1500.0,10...|
|    2000|    2019|    1200|    1100|    1069|    1000|[2000.0,2019.0,12...|
|    2000|   36681|   10000|    9000|     689|     679|[2000.0,36681.0,1...|
+--------+--------+--------+--------+--------+--------+--------------------+
only showing top 5 rows

Pearson correlation matrix:
DenseMatrix([[1.        , 0.28557553, 0.25219114, 0.19955793, 0.14845928,
              0.18573526],
             [0.28557553, 1.        , 0.24477045, 0.18010674, 0.18090775,
              0.15763392],
             [0.25219114, 0.24477045, 1.        , 0.

### Scaling the monetary numbers to similar ranges

In [100]:
from pyspark.ml.feature import VectorAssembler,StandardScaler

money_cols = ['LIMIT_BAL']+pastPayments+billStatement+prevPayment
va = VectorAssembler(inputCols=money_cols,outputCol='assembledMoneyFeatures')
df_assembled = va.transform(df)

scaler = StandardScaler(inputCol="assembledMoneyFeatures", outputCol="scaledAssembledMoneyFeatures", withStd=True, withMean=True)
df_moneyScaled = scaler.fit(df_assembled).transform(df_assembled).drop('assembledMoneyFeatures')
df_moneyScaled.select('scaledAssembledMoneyFeatures').show(5)

+----------------------------+
|scaledAssembledMoneyFeatures|
+----------------------------+
|        [-1.1367012005089...|
|        [-0.3659744005642...|
|        [-0.5971924405476...|
|        [-0.9054831605255...|
|        [-0.9054831605255...|
+----------------------------+
only showing top 5 rows



### Preparing features and labels for Machine Learning models

In [101]:
from pyspark.ml.feature import VectorAssembler,StandardScaler

va = VectorAssembler(inputCols=nonMoneyFeatures,outputCol='nonMoneyFeatures')
dataset = va.transform(df_moneyScaled)

dataset.select("nonMoneyFeatures").show(5)

+------------------+
|  nonMoneyFeatures|
+------------------+
|[2.0,2.0,1.0,24.0]|
|[2.0,2.0,2.0,26.0]|
|[2.0,2.0,2.0,34.0]|
|[2.0,2.0,1.0,37.0]|
|[1.0,2.0,1.0,57.0]|
+------------------+
only showing top 5 rows



In [102]:
va = VectorAssembler(inputCols=['nonMoneyFeatures', 'scaledAssembledMoneyFeatures'],outputCol='features')
dataset = va.transform(dataset)
dataset = dataset.withColumnRenamed('DEFAULT','label')

dataset.select("features", "label").show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[2.0,2.0,1.0,24.0...|    1|
|[2.0,2.0,2.0,26.0...|    1|
|[2.0,2.0,2.0,34.0...|    0|
|[2.0,2.0,1.0,37.0...|    0|
|[1.0,2.0,1.0,57.0...|    0|
+--------------------+-----+
only showing top 5 rows



### Split into train (80%) and test (20%)

In [103]:
training, test = dataset.randomSplit([0.8, 0.2])
print("Training set:",training.count())
print("Testing set:",test.count())

Training set: 24092
Testing set: 5908


### 3. Machine Learning Models

Employ three different models (logistic regression, decision tree, and random forest).

Compare the models' performances (e.g., AUC).

Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).

What more would you do with this data? Anything to help you devise a better solution?

#### 1. logistic regression

In [105]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
lrModel = lr.fit(training)

predictions = lrModel.transform(test).select("prediction", "label", "features")
predictions.show(5)

trainingSummary = lrModel.summary

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
auROC = trainingSummary.areaUnderROC
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s\nauROC: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall, auROC))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    1|[2.0,2.0,1.0,24.0...|
|       0.0|    1|[2.0,2.0,2.0,26.0...|
|       0.0|    0|[1.0,1.0,2.0,29.0...|
|       0.0|    0|[2.0,2.0,2.0,23.0...|
|       0.0|    0|[2.0,3.0,3.0,23.0...|
+----------+-----+--------------------+
only showing top 5 rows

Accuracy: 0.8108500747135978
FPR: 0.6026546167989388
TPR: 0.8108500747135978
F-measure: 0.7708984162649318
Precision: 0.7963149441774864
Recall: 0.8108500747135978
auROC: 0.7210958314838776


#### 2. decision tree

In [106]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier()
dtModel = dt.fit(training)

predictions = dtModel.transform(test).select("prediction", "label", "features")
predictions.show(5)

accuracy = MulticlassClassificationEvaluator(metricName="accuracy").evaluate(predictions)
falsePositiveRate = MulticlassClassificationEvaluator(metricName="weightedTruePositiveRate").evaluate(predictions)
truePositiveRate = MulticlassClassificationEvaluator(metricName="weightedFalsePositiveRate").evaluate(predictions)
fMeasure = MulticlassClassificationEvaluator(metricName="weightedFMeasure").evaluate(predictions) # 
precision = MulticlassClassificationEvaluator(metricName="weightedPrecision").evaluate(predictions)
recall = MulticlassClassificationEvaluator(metricName="weightedRecall").evaluate(predictions)
auROC = BinaryClassificationEvaluator(rawPredictionCol='prediction').evaluate(predictions)

print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s\nauROC: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall, auROC))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|    1|[2.0,2.0,1.0,24.0...|
|       1.0|    1|[2.0,2.0,2.0,26.0...|
|       0.0|    0|[1.0,1.0,2.0,29.0...|
|       0.0|    0|[2.0,2.0,2.0,23.0...|
|       0.0|    0|[2.0,3.0,3.0,23.0...|
+----------+-----+--------------------+
only showing top 5 rows

Accuracy: 0.8226134055517942
FPR: 0.8226134055517942
TPR: 0.5014725735656529
F-measure: 0.8010458822761062
Precision: 0.8072160610154474
Recall: 0.8226134055517942
auROC: 0.6605704159930705


#### 3. random forest

In [107]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier()
rfModel = rf.fit(training)

predictions = rfModel.transform(test).select("prediction", "label", "features")
predictions.show(5)

accuracy = MulticlassClassificationEvaluator(metricName="accuracy").evaluate(predictions)
falsePositiveRate = MulticlassClassificationEvaluator(metricName="weightedTruePositiveRate").evaluate(predictions)
truePositiveRate = MulticlassClassificationEvaluator(metricName="weightedFalsePositiveRate").evaluate(predictions)
fMeasure = MulticlassClassificationEvaluator(metricName="weightedFMeasure").evaluate(predictions) # 
precision = MulticlassClassificationEvaluator(metricName="weightedPrecision").evaluate(predictions)
recall = MulticlassClassificationEvaluator(metricName="weightedRecall").evaluate(predictions)
auROC = BinaryClassificationEvaluator(rawPredictionCol='prediction').evaluate(predictions)

print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s\nauROC: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall, auROC))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       1.0|    1|[2.0,2.0,1.0,24.0...|
|       0.0|    1|[2.0,2.0,2.0,26.0...|
|       0.0|    0|[1.0,1.0,2.0,29.0...|
|       0.0|    0|[2.0,2.0,2.0,23.0...|
|       0.0|    0|[2.0,3.0,3.0,23.0...|
+----------+-----+--------------------+
only showing top 5 rows

Accuracy: 0.8209207853757616
FPR: 0.8209207853757617
TPR: 0.5198069964105043
F-measure: 0.7962858970701344
Precision: 0.8056767297674514
Recall: 0.8209207853757617
auROC: 0.6505568944826288


In [108]:
# Logistic Regression:
#     Accuracy: 0.8108500747135978  (close to winner)
#     FPR: 0.6026546167989388       (winner)
#     TPR: 0.8108500747135978       (winner)
#     F-measure: 0.7708984162649318 (close to winner)
#     Precision: 0.7963149441774864 (close to winner)
#     Recall: 0.8108500747135978    (close to winner)
#     auROC: 0.7210958314838776     (winner)

# Decision Tree:
#     Accuracy: 0.8226134055517942  (winner)
#     FPR: 0.8226134055517942       
#     TPR: 0.5014725735656529
#     F-measure: 0.8010458822761062 (winner)
#     Precision: 0.8072160610154474
#     Recall: 0.8226134055517942
#     auROC: 0.6605704159930705

# Random Forest:
#     Accuracy: 0.8209207853757616
#     FPR: 0.8209207853757617
#     TPR: 0.5198069964105043
#     F-measure: 0.7962858970701344
#     Precision: 0.8056767297674514
#     Recall: 0.8209207853757617
#     auROC: 0.6505568944826288

### Compare the models
##### Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).


From the metrics above, we notice that Logistic Regression performs the best between the three models.

One considerable difference between LR and the other 2 models is FPR and TPR. 
    - FPR for LR (0.60) is much smaller than the other 2 models (~ 0.82).
    - TPR for LR (0.81) is much higher than the other 2 models (~ 0.50).

- Having high FPR means that the model will have a lot of False Positives which is not good for the model.

- Having low TPR means that the model will have few True Positives which also not good for the model.

On the other hand, all other metrics are either the highest or very close to the highest.

Thus, Logistic Regression is the best from these results.

## 4. Possible Improvements

###### What more would you do with this data? Anything to help you devise a better solution?


Possible improvements are:

- There is a high correlation between the columns "Among history of past payments" (X6 - X11) and "bill statements" (X12-X17), thus one possible solution is to drop some of these columns that cause correlation, since one might be a linear combination of other columns which could result in the model overfitting.


- Cross validation could be employed and hyper parameter search like was done in section 6 with the previous dataset