In [1]:
import numpy as np
import math
from numpy.matlib import repmat
import sys
import matplotlib 
import matplotlib.pyplot as plt
from scipy.io import loadmat
import time
import os
import warnings

In [3]:
sys.path.append('')
warnings.filterwarnings("ignore", category=DeprecationWarning) 


# load in some binary test data (labels are -1, +1)
data = loadmat("./ion.mat")
xTrIon  = data['xTr'].T
yTrIon  = data['yTr'].flatten()
xTeIon  = data['xTe'].T
yTeIon  = data['yTe'].flatten()

xTrIon.shape, yTrIon.shape, xTeIon.shape, yTeIon.shape

((281, 34), (281,), (70, 34), (70,))

In [13]:
class TreeNode(object):
    """Tree class.
    
    (You don't need to add any methods or fields here but feel
    free to if you like. Our tests will only reference the fields
    defined in the constructor below, so be sure to set these
    correctly!)
    """
    
    def __init__(self, left, right, parent, cutoff_id, cutoff_val, prediction):
        self.left = left
        self.right = right
        self.parent = parent
        self.cutoff_id = cutoff_id
        self.cutoff_val = cutoff_val
        self.prediction = prediction

In [4]:
def spiraldata(N=300):
    r = np.linspace(1,2*np.pi,N)
    xTr1 = np.array([np.sin(2.*r)*r, np.cos(2*r)*r]).T
    xTr2 = np.array([np.sin(2.*r+np.pi)*r, np.cos(2*r+np.pi)*r]).T
    xTr = np.concatenate([xTr1, xTr2], axis=0)
    yTr = np.concatenate([np.ones(N), -1 * np.ones(N)])
    xTr = xTr + np.random.randn(xTr.shape[0], xTr.shape[1])*0.2
    
    xTe = xTr[::2,:]
    yTe = yTr[::2]
    xTr = xTr[1::2,:]
    yTr = yTr[1::2]
    
    return xTr,yTr,xTe,yTe

xTrSpiral,yTrSpiral,xTeSpiral,yTeSpiral=spiraldata(150)

In [None]:
def sqsplit(xTr,yTr,weights=None):
    """Finds the best feature, cut value, and loss value.
    
    Input:
        xTr:     n x d matrix of data points
        yTr:     n-dimensional vector of labels
        weights: n-dimensional weight vector for data points
    
    Output:
        feature:  index of the best cut's feature
        cut:      cut-value of the best cut
        bestloss: loss of the best cut
    """
    N,D = xTr.shape
    assert D > 0 # must have at least one dimension
    assert N > 1 # must have at least two samples
    if weights is None: # if no weights are passed on, assign uniform weights
        weights = np.ones(N)
    weights = weights/sum(weights) # Weights need to sum to one (we just normalize them)
    bestloss = np.inf
    feature = np.inf
    cut = np.inf
    
    # YOUR CODE HERE
    
    # Optimal the split
    c_list = np.zeros(D, 1)
    for j in range(D):
        X = xTr[:,j]
        Y = yTr
        sorted_idx = np.argsort(X)
        X_sorted, Y_sorted = X[sorted_idx], Y[sorted_idx]
        c_best = None

        for i in range(len(X_sorted) - 1):
            c = (X_sorted[i] + X_sorted[i + 1]) / 2  # Midpoint 
            left_mask = X_sorted <= c
            right_mask = X_sorted > c
    
        

    return feature, cut, bestloss

In [None]:
def sqsplit(xTr, yTr, weights=None):
    """
    Finds the best feature, cut value, and loss value.
    """
    N, D = xTr.shape
    if weights is None:
        weights = np.ones(N)
    weights = weights / np.sum(weights)  # normalize weights
    best_feature = None
    best_cut = None
    best_loss = np.inf
    
    for j in range(D):
        # Sort the data according to the j-th feature
        sorted_idx = np.argsort(xTr[:, j])
        sorted_xTr = xTr[sorted_idx, j]
        sorted_yTr = yTr[sorted_idx]
        sorted_weights = weights[sorted_idx]

        # Compute the total weight and weighted sum of labels
        W_total = np.sum(sorted_weights)
        P_total = np.sum(sorted_weights * sorted_yTr)

        # Initialize left and right sums
        W_L, W_R = 0, W_total
        P_L, P_R = 0, P_total
        Q_L, Q_R = 0, np.sum(sorted_weights * sorted_yTr**2)
        
        for i in range(N - 1):
            # Move data point from right to left
            xi, yi, wi = sorted_xTr[i], sorted_yTr[i], sorted_weights[i]
            W_L += wi
            W_R -= wi
            P_L += wi * yi
            P_R -= wi * yi
            Q_L += wi * yi**2
            Q_R -= wi * yi**2
            
            # Skip if next value is the same
            if sorted_xTr[i] == sorted_xTr[i + 1]:
                continue
            
            # Calculate loss for the current split
            loss_L = Q_L - (P_L**2) / W_L if W_L > 0 else 0
            loss_R = Q_R - (P_R**2) / W_R if W_R > 0 else 0
            loss = loss_L + loss_R
            
            # Update best split if this is the smallest loss so far
            if loss < best_loss:
                best_feature = j
                best_cut = (sorted_xTr[i] + sorted_xTr[i + 1]) / 2.0
                best_loss = loss
    
    return best_feature, best_cut, best_loss


In [None]:
# Based on the TreeNode class definition provided, let's implement the CART function

def cart(xTr, yTr, depth=np.inf, weights=None):
    """Builds a CART tree.

    Args:
        xTr: n x d matrix of data points
        yTr: n-dimensional vector of labels
        maxdepth: maximum tree depth
        weights: n-dimensional weight vector for data points

    Returns:
        tree: root of decision tree
    """
    
    n, d = xTr.shape
    
    # If no weights are passed, assign equal weights to each data point
    if weights is None:
        weights = np.ones(n) / float(n)
    else:
        weights = np.array(weights)  # ensure weights is a numpy array
        weights = weights / np.sum(weights)  # normalize weights
    
    # Base case: if max depth is 0 or there's only one unique label, return a leaf node
    if depth == 0 or len(np.unique(yTr)) == 1:
        return TreeNode(None, None, None, None, None, np.mean(yTr))
    
    # Use sqsplit to determine the best feature to split on and the best cutoff
    feature, cutoff, _ = sqsplit(xTr, yTr, weights)
    
    # If no valid split was found (feature is infinity), return a leaf node
    if np.isinf(feature):
        return TreeNode(None, None, None, None, None, np.mean(yTr))
    
    # Split the data based on the feature and cutoff value found
    left_idx = xTr[:, feature] <= cutoff
    right_idx = xTr[:, feature] > cutoff
    
    # If either side is empty, don't split and return a leaf node
    if np.sum(left_idx) == 0 or np.sum(right_idx) == 0:
        return TreeNode(None, None, None, None, None, np.mean(yTr))
    
    # Create the left and right subtrees recursively, decreasing the depth by 1
    left_subtree = cart(xTr[left_idx], yTr[left_idx], depth-1, weights[left_idx])
    right_subtree = cart(xTr[right_idx], yTr[right_idx], depth-1, weights[right_idx])
    
    # Create and return the current tree node linking to the left and right subtrees
    return TreeNode(left_subtree, right_subtree, None, feature, cutoff, None)

# To test the cart function, we'll need the sqsplit function to be correctly defined in this environment.
# Let's assume that the sqsplit function is available here and is working correctly.


In [None]:
def evaltree(root, xTe):
    """Evaluates xTe using decision tree root."""
    assert root is not None
    
    def traverse(node, x):
        # Iteratively traverse the tree until a leaf node is reached
        while node.cutoff_id is not None:
            if x[node.cutoff_id] <= node.cutoff_val:
                if node.left is not None:
                    node = node.left
                else:
                    return node.prediction
            else:
                if node.right is not None:
                    node = node.right
                else:
                    return node.prediction
        return node.prediction

    # Apply traversal for each data point in xTe
    n = xTe.shape[0]
    pred = np.zeros(n)
    for i in range(n):
        pred[i] = traverse(root, xTe[i])
    
    return pred

# Assuming we have a trained tree 'root' and test set 'xTeSpiral':
# pred = evaltree(root, xTeSpiral)
# Now 'pred' will contain the predictions for 'xTeSpiral' using the provided 'root' tree.


In [None]:
def boosttree(x, y, maxiter=100, maxdepth=2):
    """Learns a boosted decision tree.
    Input:
        x:        n x d matrix of data points
        y:        n-dimensional vector of labels
        maxiter:  maximum number of trees
        maxdepth: maximum depth of a tree
    Output:
        forest: list of TreeNode decision trees of length m
        alphas: m-dimensional weight vector
    """
    assert np.allclose(np.unique(y), np.array([-1, 1]))  # the labels must be -1 and 1
    n, d = x.shape
    weights = np.ones(n) / n
    preds = np.zeros(n)
    forest = []
    alphas = []
    
    for i in range(maxiter):
        # Train a tree with the current weights
        tree = cart(x, y, depth=maxdepth, weights=weights)
        # Make predictions with the tree
        pred = evaltree(tree, x)
        # Compute weighted error rate
        weighted_error = np.sum(weights * (pred != y))
        # Compute alpha (tree weight), which is based on the error rate
        alpha = 0.5 * np.log((1 - weighted_error) / max(weighted_error, 1e-16))
        # Update the weights of the data points
        weights *= np.exp(-alpha * y * pred)
        weights /= np.sum(weights)  # Normalize the weights
        
        # Store the tree and its corresponding alpha
        forest.append(tree)
        alphas.append(alpha)
        
        # Update the predictions with the current tree
        preds += alpha * pred
        # Check if the predictions are perfect
        if np.all(np.sign(preds) == y):
            break  # Stop if predictions are perfect
    
    return forest, alphas

# Now let's create the boosted trees using the spiral data
forest, alphas = boosttree(xTrSpiral, yTrSpiral, maxiter=100, maxdepth=2)

# We will use evalforest to evaluate the boosted trees
# We need to redefine evalforest function to include the alphas
def evalforest(trees, X, alphas=None):
    m = len(trees)
    n, d = X.shape
    if alphas is None:
        alphas = np.ones(m) / m
    pred = np.zeros(n)
    for alpha, tree in zip(alphas, trees):
        pred += alpha * evaltree(tree, X)
    return np.sign(pred)

# Evaluate the boosted forest on the training and testing data
training_preds = evalforest(forest, xTrSpiral, alphas)
testing_preds = evalforest(forest, xTeSpiral, alphas)

# Calculate the error rates
training_error = np.mean(training_preds != yTrSpiral)
testing_error = np.mean(testing_preds != yTeSpiral)

print(f"Boosted forest training error: {training_error:.4f}")
print(f"Boosted forest testing error: {testing_error:.4f}")
