# **Global Constants**

In [0]:
JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
GDRIVE_DIR = "/content/gdrive"
GDRIVE_HOME_DIR = GDRIVE_DIR + "/My Drive"
GDRIVE_DATA_DIR = GDRIVE_HOME_DIR + "/Teaching/2019-20-BDC/datasets"
DATASET_URL = "https://github.com/gtolomei/big-data-computing/raw/master/datasets/bank-marketing.csv.bz2"
GDRIVE_DATASET_FILE = GDRIVE_DATA_DIR + "/" + DATASET_URL.split("/")[-1]

RANDOM_SEED = 42 # for reproducibility

# **Spark + Google Colab Setup**

## **1.** Install PySpark and related dependencies

In [0]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = JAVA_HOME

## **2.** Import useful Python packages

In [0]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

## **3.** Create Spark context

In [0]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050").set('spark.executor.memory', '4G').set('spark.driver.memory', '45G').set('spark.driver.maxResultSize', '10G')

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## **4.** Create <code>ngrok</code> tunnel to check the Spark UI

In [0]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

## **5.** Link Colab to our Google Drive

In [0]:
# Point Colaboratory to our Google Drive

from google.colab import drive

drive.mount(GDRIVE_DIR, force_remount=True)

## **6.** Check everything is ok

In [0]:
spark

In [0]:
sc._conf.getAll()

# **The Prediction Task**

In this notebook, we will be using a dataset from [Kaggle](https://www.kaggle.com/rouseguy/bankbalanced/data) containing a _balanced_ random sample of **11,162 instances** extracted from the original (_unbalanced_) dataset of 45,211 examples, available from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Bank+Marketing).

The dataset is related with direct marketing campaigns (i.e., phone calls) of a Portuguese banking institution. Each record $(\mathbf{x}_i, y_i)$ contains customer information represented by means of **16 features** (i.e., $\mathbf{x}_i = x_{i,1}, \ldots, x_{i,16}$), along with a **binary response** ($y_i$), which indicates whether the given customer subscribes ($y_i = 1$) or not ($y_i = 0$) the term deposit proposed by the phone marketing campaign.

The classification goal is, given a _new_ customer, to predict if she/he will subscribe a term deposit.

# **1. Data Collection**

This is the first step we need to accomplish before going any further. The dataset will be downloaded directly to our Google Drive, as usual.

### **Download dataset file from URL directly to our Google Drive**

In [0]:
def get_data(dataset_url, dest, chunk_size=1024):
  response = requests.get(dataset_url, stream=True)
  if response.status_code == 200:
    with open(dest, "wb") as file:
      for block in response.iter_content(chunk_size=chunk_size): 
        if block: 
          file.write(block)

In [0]:
print("Retrieving dataset from URL: {} ...".format(DATASET_URL))
get_data(DATASET_URL, GDRIVE_DATASET_FILE)
print("Dataset successfully retrieved and stored at: {}".format(GDRIVE_DATASET_FILE))

### **Read dataset file into a Spark Dataframe**

In [0]:
bank_df = spark.read.load(GDRIVE_DATASET_FILE, 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="true"
                         )

### **Check the shape of the loaded dataset, i.e., number of rows and columns**

In [0]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(bank_df.count(), len(bank_df.columns)))

### **Print out the schema of the loaded dataset**

In [0]:
bank_df.printSchema()

### **Dataset Shape and Schema**

The dataset contains **11,162** records of marketing campaigns; each record, is represented by the following set of **17** columns:
- `age`: The customer's age (_numerical_, _discrete_);
- `job`: The customer's type of job (_categorical_, _nominal_: "`admin.`", "`blue-collar`", "`entrepreneur`", "`housemaid`", "`management`", "`retired`", "`self-employed`", "`services`", "`student`" "`technician`", "`unemployed`", "`unknown`");
- `marital`: The customer's marital status (_categorical_, _nominal_: "`divorced`", "`married`", "`single`", "`unknown`") [**NOTE:** "`divorced`" means divorced or widowed];
- `education`: The customer's level of education (_categorical_, _ordinal_: "`basic.4y`", "`basic.6y`", "`basic.9y`", "`high.school`", "`illiterate`", "`professional.course`", "`university.degree`", "`unknown`");
- `default`: Indicates if the customer has credit in default (_categorical_, _nominal_: "`no`", "`yes`", "`unknown`");
- `balance`: The customer's average yearly balance in Euro (_numerical_, _continuous_);
- `housing`: Indicates if the customer has a housing loan (_categorical_, _nominal_: "`no`", "`yes`", "`unknown`");
- `loan`: Indicates if the customer has a personal loan (_categorical_, _nominal_: "`no`", "`yes`", "`unknown`");
- `contact`:  The customer's contact medium type (_categorical_, _nominal_: "`cellular`", "`telephone`");
- `day`: Last contact day of the month (_numerical_, _discrete_: ranging from `1` to `31`);
- `month`: Last contact month of year (_categorical_, _nominal_: "`jan`", "`feb`", "`mar`", ..., "`nov`", "`dec`");
- `duration`: Last contact duration, in seconds (_numerical_, _continuous_). [**NOTE:** This attribute highly affects the output target (e.g., if `duration = 0` then `deposit = "no"`). Yet, the duration is not known before a call is performed. Also, after the end of the call `deposit` is obviously known. Thus, this feature should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.]
- `campaign`: The number of contacts performed during this campaign and for this customer (_numerical_, _discrete_) [**NOTE:** includes last contact];
- `pdays`: The number of days that passed by after the client was last contacted from a previous campaign (_numerical_, _discrete_) [**NOTE:** `999` means client was not previously contacted];
- `previous`: The number of contacts performed before this campaign and for this client (_numerical_, _discrete_);
- `poutcome`: The outcome of the previous marketing campaign (_categorical_, _nominal_: "`failure`", "`nonexistent`", "`success`");
- **`deposit`**: Indicates if the customer subscribed a term deposit (_categorical_, _nominal_: "`yes`", "`no`") **[This is the _binary target_ variable we want to predict]**.


In [0]:
# Let's define some constants which we will use throughout this notebook
NUMERICAL_FEATURES = ["age", 
                      "balance",
                      "day",
                      "duration",
                      "campaign",
                      "pdays",
                      "previous"
                      ]
CATEGORICAL_FEATURES = ["job", 
                        "marital", 
                        "education", 
                        "default", 
                        "housing",
                        "loan",
                        "contact",
                        "month",
                        "poutcome"
                        ]
TARGET_VARIABLE = "deposit"

In [0]:
print("{:d} Numerical features = [{:s}]".format(len(NUMERICAL_FEATURES), ", ".join(["`{:s}`".format(nf) for nf in NUMERICAL_FEATURES])))
print("{:d} Categorical features = [{:s}]".format(len(CATEGORICAL_FEATURES), ", ".join(["`{:s}`".format(nf) for nf in CATEGORICAL_FEATURES])))
print("1 Target variable = `{:s}`".format(TARGET_VARIABLE))

### **Display the first 5 rows of the dataset**

In [0]:
bank_df.show(5)

### **Check for any missing values**

In [0]:
for c in bank_df.columns:
  print("N. of missing values of column `{:s}` = {:d}".format(c, bank_df.where(col(c).isNull()).count()))

# **2. Data Exploration**

### **Summary of Descriptive Statistics**

In [0]:
bank_df.describe().toPandas().transpose() # Transpose will allow a better visualization

In [0]:
# To access plotting libraries, we need to first transform our PySpark DataFrame into a Pandas DataFrame
bank_pdf = bank_df.toPandas() 

In [0]:
# Set some default plotting configuration using seaborn properties
sns.set_style("darkgrid")
sns.set_context("notebook", rc={"lines.linewidth": 2, 
                                "xtick.labelsize":14, 
                                "ytick.labelsize":14,
                                "axes.labelsize": 18,
                                "axes.titlesize": 20,
                                })

### **Analysis of Data Distributions: Numerical Features**

### 1. Distributions of individual numerical features

In [0]:
# Plot the distribution of values of each column of interest
n_rows = 4
n_cols = 2

fig, axes = plt.subplots(n_rows, n_cols, figsize=(14,20))

for i,f in enumerate(NUMERICAL_FEATURES):
    _ = sns.distplot(bank_pdf[f],
                    kde_kws={"color": "#ca0020", "lw": 1}, 
                    hist_kws={"histtype": "bar", "edgecolor": "k", "linewidth": 1,"alpha": 0.8, "color": "#92c5de"},
                    ax=axes[i//n_cols, i%n_cols]
                    )

fig.delaxes(axes[3][1]) # Remove the last cell of the plot

fig.tight_layout(pad=1.5)

### 2. Pairwise regression plots

In [0]:
# Let's now plot the pairwise relationship between our numerical features
_ = sns.pairplot(data=bank_pdf, 
                 vars=sorted(NUMERICAL_FEATURES), 
                 hue=TARGET_VARIABLE, 
                 kind="reg",
                 diag_kind='hist',
                 diag_kws = {'alpha':0.55, 'bins':20},
                 markers=["o", "s"]
                )

### **Observations**

It is quite evident that there aren't highly correlated numeric features. Therefore, we will initially keep all of them for the model. Note that the feature `day` does not seem really informative (i.e., it is kind of uniformly distributed across all of its values for customers who both decide to opt for a deposit and for those who don't).

### **Analysis of Data Distributions: Categorical Features**

### 1. Histograms of individual categorical features

In [0]:
# For categorical variables, 'countplot' is the way to go
# Create a Figure containing 3x3 subplots
n_rows = 3
n_cols = 3

fig, axes = plt.subplots(n_rows, n_cols, figsize=(14,14))

for i,f in enumerate(sorted(CATEGORICAL_FEATURES)): 
    ax = sns.countplot(bank_pdf[f], ax=axes[i//n_cols, i%n_cols])
    _ = ax.set_xticklabels(ax.get_xticklabels(), rotation=45, horizontalalignment='right')

fig.tight_layout(pad=1.5)

### 2. Relationship between _categorical_ features and the _target variable_ (`deposit`)

In [0]:
n_rows = 3
n_cols = 3

fig, axes = plt.subplots(n_rows, n_cols, figsize=(14,14))

i = 0
for c in sorted(CATEGORICAL_FEATURES):
    tmp_data = pd.crosstab(bank_pdf.loc[:, c], bank_pdf[TARGET_VARIABLE])
    # pandas.crosstab returns an mxn table where m is the number of values for the first argument (x) 
    # and n for the second argument (y)
    # As the second argument is always `TARGET_VARIABLE` (i.e., `deposit`), n = 2 (`deposit` is binary!)
    # e.g., x = 'housing'; y = 'deposit'
    # the following apply is used to transform the crosstab into a "normalized" table as follows:
    # each entry in the table displays how the i-th categorical value of x (i.e., i-th row) is distributed across
    # all the possible values of y (i.e., Y/N)
    tmp_data = tmp_data.apply(lambda x: x/tmp_data.sum(axis=1))
    ax = tmp_data.plot.bar(stacked=True, color=['red','green'], grid=False, ax=axes[i//n_cols, i % n_cols], legend=True)
    _ = ax.set_xticklabels(ax.get_xticklabels(), rotation=45, horizontalalignment='right')
    i += 1

fig.tight_layout(pad=1.5)

# **3. The Learning Pipeline**

### **Balanced vs. Unbalanced Dataset**

So far, we haven't looked at how the binary target variable `deposit` is distributed across the instances of our dataset. In this "lucky" example, we know that _positive_ examples (i.e., instances where `deposit = 1`) and _negative_ examples (i.e., instances where `deposit = 0`) are somehow balanced (i.e., around 50% of the instances are positives and the other 50% are negatives). That is due to the way this sample dataset has been extracted from the original one.

Most often, though, we have to deal with (very) unbalanced datasets where the minority class (which is usually the one we are interested in!) is accounting only for a small fraction of the total number of training instances. For example, consider the click-through rate (CTR) prediction problem, where we want to foresee whether an advertisement (or, in general, a web page) will be clicked by a user. There, most of the advertisements will not be clicked (negatives), whilst only a tiny fraction (even smaller than 1%) of them will be.

The fact that a dataset is balanced (respectively, unbalanced) affects the process which we should use to correctly splitting it into _training_ and _test_ set. In particular:

- If the dataset is (almost) balanced, we can safely use a **simple random sampling** strategy, which assigns to every instance the same probability of being selected (i.e., if there are $m$ instances, each one will be picked with the same uniform probability $p = 1/m$);
- If the dataset is (very) unbalanced, simple random sampling might lead to a poor splitting strategy, where - for instance - the test set ends up containing only examples that are labeled with the most representative class. To overcome such an issue, **stratified random sampling** is the right choice to take as it guarantees that both the training and the test split follow the same class distribution observed in the original dataset (e.g., if the dataset contains 99% of negative instances and 1% of positive ones, so will the training and the test set). This works by first "stratifying" the data according to the two groups (i.e., positives vs. negatives), and within each group apply simple random sampling. For example, if our original dataset contains $m$ instances so that $m = m^+ + m^-$ and $m^+ \ll m^-$ (e.g., $\frac{m^+}{m} = 0.01)$ and we want to sample $k < m$ instances out of the dataset, we will first stratify the original dataset and will select $k^+ = \frac{km^+}{m}$ positive instances and $k^- = \frac{km^-}{m}$ negative instances, respectively.

### Let's first verify our dataset is actually _balanced_

In [0]:
bank_df.groupBy(TARGET_VARIABLE).count().show()

### **Dataset Splitting: Training vs. Test Set**

Before moving along with any preprocessing involving data transformations, we will split our dataset into **2** portions:
- _training set_ (e.g., accounting for **80%** of the total number of instances);
- _test set_ (e.g., accounting for the remaining **20%** of instances)

In [0]:
# Randomly split our original dataset `house_df` into 80÷20 for training and test, respectively
train_df, test_df = bank_df.randomSplit([0.8, 0.2], seed=RANDOM_SEED)

In [0]:
print("Training set size: {:d} instances".format(train_df.count()))
print("Test set size: {:d} instances".format(test_df.count()))

### **Working on the Training Set only**

From now on, we will be working on the training set portion only. The test set will come back into play when we evaluate our learned model.

### **Transform Categorical features into Numerical using One-Hot Encoding**

Note that this step is not always mandatory (e.g., decision trees are able to work nicely with categorical features without the need of transforming them to numerical). Still, other methods (like logistic regression) are designed to operate with numerical inputs only.

To transform _categorical_ features into _numerical_ ones we proceed as follows.
We setup a pipeline which is composed of the following steps:
- [`StringIndexer`](https://spark.apache.org/docs/latest/ml-features#stringindexer): encodes a string column of labels to a column of label indices. The indices are in `[0, numLabels)`, and 4 ordering options are supported (default `frequencyDesc`, which assigns the most frequent label the index `0`, and so on and so forth).
- [`OneHotEncoderEstimator`](https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator): maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. An important parameter is `handleInvalid`, which indicates how to deal with previously unseen labels. By default this raises an error but it can be set to as `keep` to assign previously unseen labels a fallback value.
- [`VectorAssembler`](https://spark.apache.org/docs/latest/ml-features#vectorassembler): is a transformer that combines a given list of columns into a single vector column.

In [0]:
# This function is responsible to implement the pipeline above for transforming categorical features into numerical ones
def to_numerical(df, numerical_features, categorical_features, target_variable):

    """
    Args:
        - df: the input dataframe
        - numerical_features: the list of column names in `df` corresponding to numerical features
        - categorical_features: the list of column names in `df` corresponding to categorical features
        - target_variable: the column name in `df` corresponding to the target variable

    Return:
        - transformer: the pipeline of transformation fit to `df` (for future usage)
        - df_transformed: the dataframe transformed according to the pipeline
    """
    
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler


    # 1. Create a list of indexers, i.e., one for each categorical feature
    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]

    # 2. Create the one-hot encoder for the list of features just indexed (this encoder will keep any unseen label in the future)
    encoder = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexers], 
                                    outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers], 
                                    handleInvalid="keep")

    # 3. Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # 4. Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=encoder.getOutputCols() + numerical_features, outputCol="features")

    # 5. Populate the stages of the pipeline
    stages = indexers + [encoder] + [label_indexer] + [assembler]

    # 6. Setup the pipeline with the stages above
    pipeline = Pipeline(stages=stages)

    # 7. Transform the input dataframe accordingly
    transformer = pipeline.fit(df)
    df_transformed = transformer.transform(df)

    # 8. Eventually, return both the transformed dataframe and the transformer object for future transformations
    return transformer, df_transformed 

In [0]:
# Remove `duration` from the list of NUMERICAL_FEATURES
NUMERICAL_FEATURES.remove("duration")
print("Removing `duration` from the set of numerical features: [{:s}]".format(", ".join([nf for nf in NUMERICAL_FEATURES])))

In [0]:
 # Transform the training set and get back both the transformer and the new dataset
oh_transformer, oh_train_df = to_numerical(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [0]:
# Show the result of numerical transformation
oh_train_df.show(5)

In [0]:
# Select `features` and `label` (i.e., formerly `deposit`) target variable only
train = oh_train_df.select(["features", "label"])

In [0]:
train.show(5, truncate=False)

# **Logistic Regression**

We first train a logistic regression model, using the training set above. To do so, we use the `LogisticRegression` object provided by the [PySpark API](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression) within the package `pyspark.ml.classification`.

The API is similar to the one we have seen for Linear Regression (i.e., implementing the **Elastic Net** regularization framework), except for the loss function which now is **cross-entropy** rather than **mean squared error**:
$$
\boldsymbol{\theta}^* = \text{argmin}_{\boldsymbol{\theta}\in \mathbb{R}^n} \frac{1}{m} \sum_{i=1}^m \log_e(1 + e^{-y_i\boldsymbol{\theta}^T\mathbf{x}_i}) + \lambda\Big(\alpha |\boldsymbol{\theta}| + (1-\alpha)||\boldsymbol{\theta}||^2\Big)
$$
In particular, we can specify the following parameters:

- `regParam` is the regularization parameter (or $\lambda$);
- `elasticNetParam` is the tradeoff parameter for regularization penalties (or $\alpha$);
  - `regParam = 0` and `elasticNetParam = 0` means there is no regularization;
  - `regParam > 0` and `elasticNetParam = 0` means there is only L2-regularization; 
  - `regParam > 0` and `elasticNetParam = 1` means there is only L1-regularization;
  - `regParam > 0` and `0 < elasticNetParam < 1` means there is both L1- and L2-regularization (Elastic Net);

As it is always the case, the optimal values of those **hyperparameters** should be tuned using a dedicated portion of the dataset (i.e., **validation set**) or by performing $k$**-fold cross validation**.

**A Note on the Optimizer**

Spark implements two algorithms to solve logistic regression: **Mini-Batch Gradient Descent** ([`pyspark.mllib.classification.LogisticRegressionWithSGD`](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=logisticregressionwithsgd)) and **L-BFGS** ([`pyspark.mllib.classification.LogisticRegressionWithLBFGS`](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=logisticregressionwithlbfgs)). By default, it uses (and recommends) L-BFGS as it generally converges faster than gradient descent due to the fact that it is a **second-order** optimization method (as opposed to **first-order** like gradient descent).

In [0]:
from pyspark.ml.classification import LogisticRegression # This corresponds to LogisticRegressionWithLBFGS

# This setting corresponds to no regularization at all (i.e., both regParam=0 and elasticNetParam=0)
log_reg = LogisticRegression(featuresCol = "features", labelCol = "label", maxIter=100)
log_reg_model = log_reg.fit(train)

### **Intercept ($\theta_0$) and Coefficients ($\theta_1, \ldots, \theta_n$)**

In [0]:
print("Intercept: {:.5f}".format(log_reg_model.intercept))
print("{:d} Coefficients: [{:s}]".format(len(log_reg_model.coefficients), ",".join(["{:.3f}".format(c) for c in log_reg_model.coefficients])))

### **Plot Coefficients**

In [0]:
theta = np.sort(log_reg_model.coefficients)

fig, ax = plt.subplots(1, 1, figsize=(8,6))
_ = sns.lineplot(x=range(0,len(log_reg_model.coefficients)), y=theta, marker="o", axes=ax)
_ = ax.set_xlabel("Theta Index", labelpad=20)
_ = ax.set_ylabel("Theta Value (log odds)", labelpad=20)

### **Summarize model performance on the Training Set**

In [0]:
# Collect training summary
training_summary = log_reg_model.summary

#### **Precision vs. Recall**

In [0]:
precision_recall = training_summary.pr.toPandas()

fig, ax = plt.subplots(1, 1, figsize=(8,6))
_ = sns.lineplot(x=precision_recall['recall'], y=precision_recall['precision'], marker="s", axes=ax)
_ = ax.set_xlabel("Recall", labelpad=20)
_ = ax.set_ylabel("Precision", labelpad=20)
_ = ax.set_title("Precision vs. Recall")

#### **Receiver-Operating Characteristic (ROC) and Area Under the ROC (AUC)**

In [0]:
roc = training_summary.roc.toPandas()

fig, ax = plt.subplots(1, 1, figsize=(8,6))
_ = sns.lineplot(x=roc['FPR'], y=roc['TPR'], marker="s", axes=ax)
_ = ax.set_xlabel("False Positive Rate", labelpad=20)
_ = ax.set_ylabel("True Positive Rate", labelpad=20)
_ = ax.set_title("ROC Curve")

In [0]:
# Print out the Area Under the ROC Curve (AUC)
print('Training Set AUC: {:.3f}'.format(training_summary.areaUnderROC))

### **Use the One-Hot encoding pipeline to transform the Test Set**

In [0]:
# Here, we use the same transformer as the one returned by the `to_numerical` function above yet applied to the test set
oh_test_df = oh_transformer.transform(test_df)

In [0]:
oh_test_df.show(5)

In [0]:
# Select `features` and `label` only
test = oh_test_df.select(["features", "label"])
test.show(5)

### **Compute predictions on the Test Set according to the model learned on the Training Set**

In [0]:
# `log_reg_model` is a Transformer which can be used to "transform" our test set
predictions = log_reg_model.transform(test)

In [0]:
# `predictions` is a dataframe containing (among other things) the predictions made by `log_reg_model` on the test set
predictions.select("features", "prediction", "label").show(10)

### **Evaluate model performance on the Test Set**

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print('Test Set AUC: {:.3f}'.format(evaluator.evaluate(predictions)))

## **Tuning Hyperparameters**

In the following, we try to summarize the whole pipeline making use also of $k$-fold cross validation to get a better estimate of the generalization performance of our logistic regression model.

More specifically, we will tune the two hyperparameters: $\lambda$ = `regParam` and $\alpha$ = `elasticNetParam`. 

In [0]:
# This function defines the general pipeline for logistic regression
def logistic_regression_pipeline(train, 
                                 numerical_features, 
                                 categorical_features, 
                                 target_variable, 
                                 with_std=True,
                                 with_mean=True,
                                 k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline

    # Configure a logistic regression pipeline, which consists of the following stages: 
    # 1) convert categorical features to numerical ones
    # 2) standardize feature values (optional)
    # ... add any other custom transformation here ...
    # n) fit a logistic regression model


    # 1.a Create a list of indexers, i.e., one for each categorical feature
    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]

    # 1.b Create the one-hot encoder for the list of features just indexed (this encoder will keep any unseen label in the future)
    encoder = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexers], 
                                    outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers], 
                                    handleInvalid="keep")

    # 1.c Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # 1.d Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=encoder.getOutputCols() + numerical_features, outputCol="features")

    # 2.a Create the StandardScaler
    # scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol="std_"+assembler.getOutputCol(), withStd=with_std, withMean=with_mean)
    # ...

    # 3 Populate the stages of the pipeline with all the preprocessing steps
    stages = indexers + [encoder] + [label_indexer] + [assembler] # + [scaler] + ...

    # 4. Create the logistic regression transformer
    log_reg = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100) # change `featuresCol=std_features` if scaler is used

    # 5. Add the logistic regression transformer to the pipeline stages (i.e., the last one)
    stages += [log_reg]

    # 6. Set up the pipeline
    pipeline = Pipeline(stages=stages)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # With 3 values for log_reg.regParam ($\lambda$) and 3 values for log_reg.elasticNetParam ($\alpha$),
    # this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from.
    param_grid = ParamGridBuilder()\
    .addGrid(log_reg.regParam, [0.0, 0.05, 0.1]) \
    .addGrid(log_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True # this flag allows us to store ALL the models trained during k-fold cross validation
                               )

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train)

    return cv_model

In [0]:
cv_model = logistic_regression_pipeline(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [0]:
# This function summarizes all the models trained during k-fold cross validation
def summarize_all_models(cv_models):
    for k, models in enumerate(cv_models):
        print("*************** Fold #{:d} ***************\n".format(k+1))
        for i, m in enumerate(models):
            print("--- Model #{:d} out of {:d} ---".format(i+1, len(models)))
            print("\tParameters: lambda=[{:.3f}]; alpha=[{:.3f}] ".format(m.stages[-1]._java_obj.getRegParam(), m.stages[-1]._java_obj.getElasticNetParam()))
            print("\tModel summary: {}\n".format(m.stages[-1]))
        print("***************************************\n")

In [0]:
# Call the function above|
summarize_all_models(cv_model.subModels)

In [0]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

In [0]:
print("Best model according to k-fold cross validation: lambda=[{:.3f}]; alfa=[{:.3f}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getRegParam(), 
             cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam(),
             )
      )
print(cv_model.bestModel.stages[-1])

### **Summarize model performance on the Training Set**

In [0]:
# `bestModel` is the best resulting model according to k-fold cross validation, which is also entirely retrained on the whole `train_df`
training_result = cv_model.bestModel.stages[-1].summary
print("***** Training Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(training_result.areaUnderROC))
print("***** Training Set *****")

### **Using the best model from $k$-fold cross validation to make predictions**

In [0]:
# Make predictions on the test set (`cv_model` contains the best model according to the result of k-fold cross validation)
# `test_df` will follow exactly the same pipeline defined above, and already fit to `train_df`
test_predictions = cv_model.transform(test_df)

In [0]:
test_predictions.select("features", "prediction", "label").show(5)

In [0]:
def evaluate_model(predictions, metric="areaUnderROC"):
    
    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    evaluator = BinaryClassificationEvaluator(metricName=metric)

    return evaluator.evaluate(predictions)

### **Evaluate model performance on the Test Set**

In [0]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

# **Decision Tree**

We now train a decision tree (i.e., classification tree), using the training set above. Remember that decision trees natively handle categorical features, extend to the multi-class classification, do not require feature scaling, and are able to capture non-linearities and feature interactions.

We will use the `DecisionTreeClassifier` object provided by the [PySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier) within the package `pyspark.ml.classification`.

In [0]:
# This function defines the general pipeline for logistic regression
def decision_tree_pipeline(train, 
                           numerical_features, 
                           categorical_features, 
                           target_variable, 
                           with_std=True,
                           with_mean=True,
                           k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline

    # Configure a decision tree pipeline, which consists of the following stages: 

    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]

    # Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numerical_features, outputCol="features")

    # Populate the stages of the pipeline with all the preprocessing steps
    stages = indexers + [label_indexer] + [assembler] # + ...

    # Create the decision tree transformer
    dt = DecisionTreeClassifier(featuresCol="features", labelCol="label") # change `featuresCol=std_features` if scaler is used

    # 5. Add the decision tree transformer to the pipeline stages (i.e., the last one)
    stages += [dt]

    # 6. Set up the pipeline
    pipeline = Pipeline(stages=stages)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # With 3 values for dt.maxDepth and 2 values for dt.impurity
    # this grid will have 3 x 2 = 9 parameter settings for CrossValidator to choose from.
    param_grid = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [3, 5, 8]) \
    .addGrid(dt.impurity, ["gini", "entropy"]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True # this flag allows us to store ALL the models trained during k-fold cross validation
                               )

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train)

    return cv_model

In [0]:
cv_model = decision_tree_pipeline(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [0]:
# This function summarizes all the models trained during k-fold cross validation

def summarize_all_models(cv_models):
    for k, models in enumerate(cv_models):
        print("*************** Fold #{:d} ***************\n".format(k+1))
        for i, m in enumerate(models):
            print("--- Model #{:d} out of {:d} ---".format(i+1, len(models)))
            print("\tParameters: maxDept=[{:d}]; impurity=[{:s}] ".format(m.stages[-1]._java_obj.getMaxDepth(), m.stages[-1]._java_obj.getImpurity()))
            print("\tModel summary: {}\n".format(m.stages[-1]))
        print("***************************************\n")

In [0]:
summarize_all_models(cv_model.subModels)

In [0]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

In [0]:
print("Best model according to k-fold cross validation: maxDept=[{:d}]; impurity=[{:s}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getMaxDepth(), 
             cv_model.bestModel.stages[-1]._java_obj.getImpurity(),
             )
      )
print(cv_model.bestModel.stages[-1])

### **Using the best model from $k$-fold cross validation to make predictions**

In [0]:
# Make predictions on the test set (`cv_model` contains the best model according to the result of k-fold cross validation)
# `test_df` will follow exactly the same pipeline defined above, and already fit to `train_df`
test_predictions = cv_model.transform(test_df)

In [0]:
test_predictions.select("features", "prediction", "label").show(5)

### **Evaluate model performance on the Test Set**

In [0]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

## **Observations**

As it turns out, one simple decision tree performed worst than logistic regression because it is too weak given the range of different features (ROC AUC = 0.532 vs. 0.757, respectively). The prediction accuracy of decision trees can be improved by ensemble methods, such as **Random Forests** (**RF**) and **Gradient Boosted Decision Trees** (**GBDT**).

# **Random Forests**

In [0]:
# This function defines the general pipeline for logistic regression
def random_forest_pipeline(train, 
                           numerical_features, 
                           categorical_features, 
                           target_variable, 
                           with_std=True,
                           with_mean=True,
                           k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline

    # Configure a random forest pipeline, which consists of the following stages: 

    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]

    # Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numerical_features, outputCol="features")

    # Populate the stages of the pipeline with all the preprocessing steps
    stages = indexers + [label_indexer] + [assembler] # + ...

    # Create the random forest transformer
    rf = RandomForestClassifier(featuresCol="features", labelCol="label") # change `featuresCol=std_features` if scaler is used

    # 5. Add the random forest transformer to the pipeline stages (i.e., the last one)
    stages += [rf]

    # 6. Set up the pipeline
    pipeline = Pipeline(stages=stages)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # With 3 values for rf.maxDepth and 3 values for rf.numTrees
    # this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from.
    param_grid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [3, 5, 8]) \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True # this flag allows us to store ALL the models trained during k-fold cross validation
                               )

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train)

    return cv_model

In [0]:
cv_model = random_forest_pipeline(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [0]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

In [0]:
print("Best model according to k-fold cross validation: maxDept=[{:d}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getMaxDepth(), 
             )
      )
print(cv_model.bestModel.stages[-1])

### **Using the best model from $k$-fold cross validation to make predictions**

In [0]:
# Make predictions on the test set (`cv_model` contains the best model according to the result of k-fold cross validation)
# `test_df` will follow exactly the same pipeline defined above, and already fit to `train_df`
test_predictions = cv_model.transform(test_df)

In [0]:
test_predictions.select("features", "prediction", "label").show(5)

### **Evaluate model performance on the Test Set**

In [0]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

## **Observations**

Using Random Forest we are able to improve ROC AUC to **0.780** from 0.532 of a single decision tree! Let's see if we can do even better using GBDT.

# **Gradient Boosted Decision Tree**

In [0]:
# This function defines the general pipeline for logistic regression
def gbdt_pipeline(train, 
                           numerical_features, 
                           categorical_features, 
                           target_variable, 
                           with_std=True,
                           with_mean=True,
                           k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import GBTClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline

    # Configure a gradient boosted decision tree pipeline, which consists of the following stages: 

    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]

    # Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numerical_features, outputCol="features")

    # Populate the stages of the pipeline with all the preprocessing steps
    stages = indexers + [label_indexer] + [assembler] # + ...

    # Create the gradient boosted decision tree transformer
    gbdt = GBTClassifier(featuresCol="features", labelCol="label") # change `featuresCol=std_features` if scaler is used

    # 5. Add the gradient boosted decision tree transformer to the pipeline stages (i.e., the last one)
    stages += [gbdt]

    # 6. Set up the pipeline
    pipeline = Pipeline(stages=stages)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # With 3 values for gbdt.maxDepth and 3 values for gbdt.maxIter (i.e., boosting rounds)
    # this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from.
    param_grid = ParamGridBuilder()\
    .addGrid(gbdt.maxDepth, [3, 5, 8]) \
    .addGrid(gbdt.maxIter, [10, 50, 100]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True # this flag allows us to store ALL the models trained during k-fold cross validation
                               )

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train)

    return cv_model

In [0]:
cv_model = gbdt_pipeline(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [0]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

In [0]:
print("Best model according to k-fold cross validation: maxDept=[{:d}]; maxIter=[{:d}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getMaxDepth(), 
             cv_model.bestModel.stages[-1]._java_obj.getMaxIter()
             )
      )
print(cv_model.bestModel.stages[-1])

### **Using the best model from $k$-fold cross validation to make predictions**

In [0]:
# Make predictions on the test set (`cv_model` contains the best model according to the result of k-fold cross validation)
# `test_df` will follow exactly the same pipeline defined above, and already fit to `train_df`
test_predictions = cv_model.transform(test_df)

In [0]:
test_predictions.select("features", "prediction", "label").show(5)

### **Evaluate model performance on the Test Set**

In [0]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

## **Final Remarks**

GBDT has improved the value of ROC AUC previously obtained by Random Forest to **0.789**: this is the highest score obtained amongst all the models we have evaluated.