Import packages.

In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn

from sklearn import datasets
from sklearn.model_selection import train_test_split

import matplotlib
%matplotlib inline
import matplotlib.pyplot as plt

# 1. Decision Trees and Bagging

In this problem, we'll implement a simplified version of random forests. We'll be using the `iris` dataset, which has 4 features that are discretized to $0.1$ steps between $0$ and $8$ (i.e. the set of all possible features is $\{0.1, 0.2, \dots, 7.8, 7.9\}$. Thus, all thresholds that we'll need to consider are $\{0.15, 0.25, \dots, 7.75, 7.85\}$.

Your task in this first part is to finish the implementation of decision trees. We've provided a template for some of the decision tree code, following which you'll finish the bagging algorithm to get a random forest.

1. Entropy (2pts): calculate the entropy of a given vector of labels in the `entropy` function. Note that the generalization of entropy to 3 classes is $H = -\sum_{i=1}^3 p_i\log(p_i)$ where $p_i$ is the proportion of examples with label $i$.
2. Find the best split (1pt): finish the `find_split` function by finding the feature index and value that results in the split minimizing entropy.
3. Build the tree (2pts): finish the `expand_node` function by completing the recursive call for building the decision tree.
4. Predict with tree (2pts): implement the `predict_one` function, which makes a prediction for a single example.

Throughout these problems, the way we represent the decision tree is by using python dicts. In particular, a node is a `dict` that can have the following keys:

1. `node['label']` Return the label that we should predict upon reaching this node. node should **only** have a `label` entry if it is a leaf node.
2. `node['left']` points to another node dict that represents this node's left child.
3. `node['right']` points to another node dict that represents this node's right child.
4. `node['split']` is a tuple containing the feature index and value that this node splits left versus right on.

In our implementation, all comparisons will be **greater than** comparisons, and "yes" answers go **left**. In other words, if `node['split'] = (2, 1.25)`, then we expect to find all data remaining at this node with feature 2 value **greater** than 1.25 in `node['left']`, and feature value **less** than 1.25 in `node['right']`.

Tips:
+ If you have NaNs, you may be dividing by zero.


In [2]:
class DecisionTree:
  def entropy(self, y):
      # Calculate the entropy of a given vector of labels in the `entropy` function.
      #
      # y := Tensor(int) of size (m) -- the given vector of labels
      # Return a float that is your calculated entropy

      # Fill in the rest
      m = y.size(0)
      if m == 0:
        return 0
      frequency = torch.bincount(y).float() / m
      # print(frequency)
      return -torch.sum(frequency * torch.log(frequency))


  def find_split(self, node, X, y, k=4):
      # Find the best split over all possible splits that minimizes entropy.
      #
      # node := Map(string: value) -- the tree represented as a Map, the key will take four
      #   different string: 'label', 'split','left','right' (See implementation below)
      #   'label': a label node with value as the mode of the labels
      #   'split': the best split node with value as a tuple of feature id and threshold
      #   'left','right': the left and right branch with value as the label node
      # X := Tensor of size (m,d) -- Batch of m examples of demension d
      # y := Tensor(int) of size (m) -- the given vectors of labels of the examples
      # k := int -- the number of classes, with default value as 4
      # Return := tuple of (int, float) -- the feature id and threshold of the best split
      m, d = X.size()
      result = None
      min_loss = torch.inf

      features_all = [i for i in range(0, d)]
      features_random = torch.randint(0, 4, (k,))

      for f_idx in features_all:
        for threshold in torch.arange(0.15, 7.9, 0.1):
          # print('threshold : {}'.format(threshold))
          mask = X[:, f_idx] > threshold
          if mask.sum() == 0 or mask.sum() == mask.size(0):
            continue

          # print('mask : {}'.format(mask))
          m_left, y_left = mask.sum(), y[mask]
          m_right, y_right = (~mask).sum(), y[~mask]
          # print('left : {} \n right : {}'.format(left, right))
          curr_loss = m_left / (m_left + m_right) * self.entropy(y_left)
          curr_loss += m_right / (m_left + m_right) * self.entropy(y_right)
          min_loss, result = curr_loss, [f_idx, threshold] if min_loss > curr_loss or result is None else result

      return result


  def expand_node(self, node, X, y, max_depth=1, k=4):
      # Completing the recursive call for building the decision tree
      # node := Map(string: value) -- the tree represented as a Map, the key will take four
      #   different string: 'label', 'split','left','right' (See implementation below)
      #   'label': a label node with value as the mode of the labels
      #   'split': the best split node with value as a tuple of feature id and threshold
      #   'left','right': the left and right branch with value as the label node
      # X := Tensor of size (m,d) -- Batch of m examples of demension d
      # y := Tensor(int) of size (m) -- the given vectors of labels of the examples
      # max_depth := int == the deepest level of the the decision tree
      # k := int -- the number of classes, with default value as 4
      # Return := tuple of (int, float) -- the feature id and threshold of the best split

      H = self.entropy(y)
      if H == 0 or max_depth == 0:
        return

      best_split = self.find_split(node, X, y, k=k)

      if best_split is None:
        return

      mask = X[:, best_split[0]] > best_split[1]
      X_left, y_left = X[mask], y[mask]
      X_right, y_right = X[~mask], y[~mask]

      del node['label']
      node['split'] = best_split
      node['left'] = {'label' : y_left.mode().values}
      node['right'] = {'label' : y_right.mode().values}

      self.expand_node(node['left'], X_left, y_left, max_depth-1, k)
      self.expand_node(node['right'], X_right, y_right, max_depth-1, k)

      return


  def predict_one(self, node, x):
      # Makes a prediction for a single example.
      # node := Map(string: value) -- the tree represented as a Map, the key will take four
      #   different string: 'label', 'split','left','right' (See implementation below)
      #   'label': a label node with value as the mode of the labels
      #   'split': the best split node with value as a tuple of feature id and threshold
      #   'left','right': the left and right branch with value as the label node
      # x := Tensor(float) of size(d,) -- the single example in a batch
      # Fill in the rest
      # print(node)
      if 'label' in node:
        return node['label']

      f_idx, threshold = node['split']
      if x[f_idx] > threshold:
        return self.predict_one(node['left'], x)
      else:
        return self.predict_one(node['right'], x)

In [3]:
def fit_decision_tree(X,y, k=4):
    # The function will fit data with decision tree with the expand_node method implemented above

    root = { 'label': y.mode().values }
    dt = DecisionTree()
    dt.expand_node(root, X, y, max_depth=10, k=k)
    return root

def predict(node, X):
    # return the predict result of the entire batch of examples using the predict_one function above.
    dt = DecisionTree()
    return torch.stack([dt.predict_one(node, x) for x in X])

Test with the Iris Dataset!

In [4]:
iris = datasets.load_iris()
data = train_test_split(
    iris.data,
    iris.target,
    test_size=0.5,
    random_state=123
)

X, X_te, y, y_te = [torch.from_numpy(A) for A in data]
X, X_te, y, y_te = X.float(), X_te.float(), y.long(), y_te.float()

DT = fit_decision_tree(X, y, k=4)
print('Train Accuracy : {}'.format((predict(DT, X) == y).float().mean().item()))
print('Test Accuracy : {}'.format((predict(DT, X_te) == y_te).float().mean().item()))

Train Accuracy : 0.800000011920929
Test Accuracy : 0.7733333110809326


### Bagging Decision Trees for Random forests

Note that our `find_split` implementation can use a random subset of the features when searching for the right split via the argument $k$. For the vanilla decision tree, we defaulted to $k=4$. Since there were 4 features, this meant that the decision tree could always considered all 4 features. By reducing the value of $k$ to $\sqrt(k)=2$, we can introduce variance into the decision trees for the bagging algorithm.

You'll now implement the bagging algorithm. Note that if you use the `clf` and `predict` functions given as keyword arguments, you can pass this section in the autograder without needing a correct implementation for decision trees from the previous section.

1. Bootstrap (1pt): Implement `bootstrap` to draw a random bootstrap dataset from the given dataset.
2. Fitting a random forest (1pt): Implement `random_forest_fit` to train a random forest that fits the data.
3. Predicting with a random forest (1pt): Implement `predict_forest_fit` to make predictions given a random forest.

Tip:
+ If you're not sure whether your bootstrap is working or not, remember that on average, there will be $1-1/e\approx 0.632$ unique samples in a bootstrapped dataset.

In [7]:
def bootstrap(X,y):
    # Draw a random bootstrap dataset from the given dataset.
    #
    # X := Tensor(float) of size (m,d) -- Batch of m examples of demension d
    # y := Tensor(int) of size (m) -- the given vectors of labels of the examples
    #
    # Return := Tuple of (Tensor(float) of size (m,d),Tensor(int) of size(m,)) -- the random bootstrap
    #       dataset of X and its correcting lable Y
    # Fill in the rest
    m = X.size(0)
    random_idx = torch.randint(0, m, (m,))
    return X[random_idx], y[random_idx]


def random_forest_fit(X, y, m, k, clf, bootstrap):
    # Train a random forest that fits the data.
    # X := Tensor(float) of size (n,d) -- Batch of n examples of demension d
    # y := Tensor(int) of size (n) -- the given vectors of labels of the examples
    # m := int -- number of trees in the random forest
    # k := int -- number of classes of the features
    # clf := function -- the decision tree model that the data will be trained on
    # bootstrap := function -- the function to use for bootstrapping (pass in "bootstrap")
    #
    # Return := the random forest generated from the training datasets
    # Fill in the rest

    # DT = fit_decision_tree(X,y,k=4)
    # clf is the root of the decision tree
    random_forest = [None for _ in range(m)]
    sqrt_k = int(k**0.5)
    for i in range(m):
      X_bag, y_bag = bootstrap(X, y)
      curr_tree = clf(X_bag, y_bag, sqrt_k)
      random_forest[i] = curr_tree

    return random_forest

def random_forest_predict(X, clfs, predict):
    # Implement `predict_forest_fit` to make predictions given a random forest.
    # X := Tensor(float) of size (m,d) -- Batch of m examples of demension d
    # clfs := list of functions -- the random forest
    # predict := function that predicts (will default to your "predict" function)
    # Return := Tensor(int) of size (m,) -- the predicted label from the random forest
    # Fill in the rest
    predictions = [predict(tree, X) for tree in clfs]
    stacked = torch.stack(predictions)
    mode_predictions, _ = torch.mode(stacked, dim=0)
    return mode_predictions

Test with the Iris Dataset!

In [6]:
torch.manual_seed(42)
RF = random_forest_fit(X, y, 50, 2, clf=fit_decision_tree, bootstrap=bootstrap)

print('Train accuracy: ', (random_forest_predict(X, RF, predict) == y).float().mean().item())
print('Test accuracy: ', (random_forest_predict(X_te, RF, predict) == y_te).float().mean().item())

Train accuracy:  0.8666666746139526
Test accuracy:  0.7733333110809326


# 2. Boosting

In this problem, you'll implement a basic boosting algorithm on the binary classification breast cancer dataset. Here, we've provided the following weak learner for you: an $\ell_2$ regularized logistic classifier trained with gradient descent.

In [8]:
class Logistic(nn.Module):
    def __init__(self):
        super(Logistic, self).__init__()
        self.linear = nn.Linear(30, 1)    # A linear layer with 30 in feature, 1 out feature (i.e. Xw+b where X is m x 30)

    def forward(self, X):
        out = self.linear(X)
        return out.squeeze()


def fit_logistic_clf(X, y):
    clf = Logistic()
    opt = torch.optim.Adam(clf.parameters(), lr=0.1, weight_decay=1e2)  # Adam optimization function, 𝜆 = 1e2
    loss = torch.nn.BCEWithLogitsLoss() # Binary Cross Entropy Loss with Logit!

    # 200 epoch for optimization!
    for t in range(200):
        out = clf(X)     # Predict for X
        opt.zero_grad()  # Intialize gradient into 0 in every iteration.
        loss(out, (y>0).float()).backward() # Calculate the gradient of the difference between the prediction (out) and the actual value (y)
        opt.step()       # Update the parameters with the optimzed value.

    return clf


def predict_logistic_clf(X, clf):
    return torch.sign(clf(X)).squeeze()

Your task is to boost this logistic classifier to reduce its bias. Implement the following two functions:

+ Finish the boosting algorithm: we've provided a template for the boosting algorithm in `boosting_fit`, however it is missing several components. Fill in the missing snippets of code.
+ Prediction after boosting (2pts): implement `boosting_predict` to make predictions with a given boosted model.

### Adaboost
- Algorithm)
  - Initialize $H_1 = 0$.
    - i.e.) Classify every example to zero
  - `for` $i=1,2,\cdots,T$
    - $\text{Calculate } h_{t+1}^* \text{ from } D_{\mathbf{w}}\sim\text{Multinomial}(\mathbf{w})$
    - $\text{Calculate } \epsilon_{t}, \alpha^* \text{ using } h_{t+1}^*$
      - cf.) Refer to [this](https://github.com/JoonHyeok-hozy-Kim/upenn_mcit/blob/main/2408_Fall_2024/CIS_520/notes/15.md#tech-how-to-get-optimal-learning-rate-%CE%B1) for how to get $\alpha^*$
    - $\text{Update } H_{t+1}\leftarrow H_t + \alpha^* h_{t+1}$
    - $\text{Update } w_i = \exp(-y_i H_{t+1}(\mathbf{x}_i)) / Z \text{ for next } D_\mathbf{w}$
- Parameter Optimizations
  - $\displaystyle h_{t+1}^*  = \arg\min_{h_{t+1}\in\mathcal{H}}  \sum_{i : h_{t+1}(\mathbf{x}_i) \ne y_i} w_i$
    - where
      - $\displaystyle w_i = \frac{1}{Z} \exp(-y_i H_t(\mathbf{x}_i))$
      - $\displaystyle Z = \sum_{i=1}^n \exp(-y_i H_t(\mathbf{x}_i))$
  - $\alpha^* = 0.5\ln\left(\frac{1-\epsilon_t}{\epsilon_t}\right)$

In [19]:
def boosting_fit(X, y, T, fit_logistic_clf, predict_logistic_clf):
    # X := Tensor(float) of size (m,d) -- Batch of m examples of demension d
    # y := Tensor(int) of size (m) -- the given vectors of labels of the examples
    # T := Maximum number of models to be implemented

    m = X.size(0)
    clfs = []
    while len(clfs) < T:
        # Calculate the weights for each sample mu. You may need to
        # divide this into the base case and the inductive case.

        if len(clfs) == 0:
            mu = torch.full((m,), 1/m)  # Initialize weights uniformly
            H_prev = torch.zeros((m,))  # Initialize ensemble prediction H_1 = 0

        else:
            prev_alpha, prev_clf = clfs[-1]
            # H_prev += prev_alpha * predict_logistic_clf(X, prev_clf)
            H_prev += (prev_alpha * predict_logistic_clf(X, prev_clf)).sign()

            mu = torch.exp(-y * H_prev)
            mu /= mu.sum()

        # Here, we draw samples according to mu and fit a weak classifier
        mask = torch.multinomial(mu, m, replacement=True)
        X0, y0 = X[mask], y[mask]
        clf = fit_logistic_clf(X0, y0)

        # Calculate the epsilon error term
        predictions = predict_logistic_clf(X, clf).sign()
        wrongs = (predictions != y).float()
        eps = (mu * wrongs).sum()

        if eps > 0.5:
            # In the unlikely even that gradient descent fails to
            # find a good classifier, we'll skip this one and try again
            continue

        # Calculate the alpha term
        alpha = 0.5 * torch.log2((1-eps) / eps)

        clfs.append((alpha, clf))

    return clfs


def boosting_predict(X, clfs, predict_logistic_clf):
    # X := Tensor(float) of size (m,d) -- Batch of m examples of demension d
    # clfs := list of tuples of (float, logistic classifier) -- the list of boosted classifiers
    # Return := Tnesor(int) of size (m) -- the predicted labels of the dataset

    m = X.size(0)
    prediction = torch.zeros((m,))
    for alpha, clf in clfs:
        # prediction = prediction + alpha * predict_logistic_clf(X, clf)
        prediction = (prediction + alpha * predict_logistic_clf(X, clf)).sign()

    return prediction.sign()

Test out your code on the breast cancer dataset. As a sanity check, your statndard logistic classifier will get a train/test accuracy of around 80% while the boosted logistic classifier will get a train/test accuracy of around 90%.

In [21]:
from sklearn.datasets import load_breast_cancer
cancer = datasets.load_breast_cancer()
data=train_test_split(cancer.data,cancer.target,test_size=0.2,random_state=123)

torch.manual_seed(123)

X,X_te,y,y_te = [torch.from_numpy(A) for A in data]
X,X_te,y,y_te = X.float(), X_te.float(), torch.sign(y.long()-0.5), torch.sign(y_te.long()-0.5)


logistic_clf = fit_logistic_clf(X,y)
print("Logistic classifier accuracy:")
print('Train accuracy: ', (predict_logistic_clf(X, logistic_clf) == y).float().mean().item())
print('Test accuracy: ', (predict_logistic_clf(X_te, logistic_clf) == y_te).float().mean().item())

boosting_clfs = boosting_fit(X,y, 10, fit_logistic_clf, predict_logistic_clf)
print("\nBoosted logistic classifier accuracy:")
print('Train accuracy: ', (boosting_predict(X, boosting_clfs, predict_logistic_clf) == y).float().mean().item())
print('Test accuracy: ', (boosting_predict(X_te, boosting_clfs, predict_logistic_clf) == y_te).float().mean().item())

Logistic classifier accuracy:
Train accuracy:  0.795604407787323
Test accuracy:  0.7982456088066101

Boosted logistic classifier accuracy:
Train accuracy:  0.8813186883926392
Test accuracy:  0.9298245906829834
