In [1]:
# import modules
import numpy as np
from sklearn.datasets import load_digits
from abc import abstractmethod

# Base Classes

In [172]:
class Node:
    '''
      this class will later get the following attributes
      all nodes:
          features
          responses
      split nodes additionally:
          left
          right
          split_index
          threshold
      leaf nodes additionally
          prediction
    '''
        

class Tree:
    '''
      base class for RegressionTree and ClassificationTree
    '''
    def __init__(self, n_min=10):
        '''n_min: minimum required number of instances in leaf nodes
        '''
        self.n_min = n_min 
    
    def predict(self, x):
        ''' return the prediction for the given 1-D feature vector x
        '''
        # first find the leaf containing the 1-D feature vector x
        node = self.root
        while not hasattr(node, "prediction"):
            j = node.split_index
            t = np.array(x)
            if x[j] <= node.threshold:
                node = node.left
            else:
                node = node.right
        # finally, return the leaf's prediction
        return node.prediction
        
    def train(self, features, responses, D_try=None):
        '''
        features: the feature matrix of the training set
        response: the vector of responses
        '''
        N, D = features.shape
        assert(responses.shape[0] == N)

        if D_try is None:
            D_try = int(np.sqrt(D)) # number of features to consider for each split decision
        
        # initialize the root node
        self.root = Node()
        self.root.features  = features
        self.root.responses = responses

        # build the tree
        stack = [self.root]
        while len(stack):
            node = stack.pop()
            active_indices = self.select_active_indices(D, D_try)
            left, right = self.make_split_node(node, active_indices)
            if left is None: # no split found
                self.make_leaf_node(node)
            else:
                stack.append(left)
                stack.append(right)
    
    def make_split_node(self, node, indices):
        '''
        node: the node to be split
        indices: a numpy array of length 'D_try', containing the feature 
                         indices to be considered for the present split
                         
        return: None, None -- if no suitable split has been found, or
                left, right -- the children of the split
        '''
        # all responses equal => no improvement possible by any split
        if np.unique(node.responses).shape[0] == 1:
            return None, None
        
        # find best feature j_min (among 'indices') and best threshold t_min for the split
        l_min = float('inf')  # upper bound for the loss, later the loss of the best split
        j_min, t_min = None, None

        for j in indices:
            thresholds = self.find_thresholds(node, j)

            # compute loss for each threshold
            for t in thresholds:
                loss = self.compute_loss_for_split(node, j, t)

                # remember the best split so far 
                # (the condition is never True when loss = float('inf') )
                if loss < l_min:
                    l_min = loss
                    j_min = j
                    t_min = t

        if j_min is None: # no split found
            return None, None

        # create children for the best split
        left, right = self.make_children(node, j_min, t_min)

        # turn the current 'node' into a split node
        # (store children and split condition)
        # ... # your code here
        node.left = left
        node.right = right
        node.split_index = j_min
        node.threshold = t_min
        # raise NotImplementedError("make_split_node(): remove this exception after adding your code above.")
        
        # return the children (to be placed on the stack)
        return left, right
    
    def select_active_indices(self, D, D_try):
        ''' return a 1-D array with D_try randomly selected indices from 0...(D-1).
        '''
        sub_list = np.random.choice(D, size=D_try)
        return sub_list
        #... # your code here
        #raise NotImplementedError("select_active_indices(): remove this exception after adding your code above.")
        
    def find_thresholds(self, node, j):
        ''' return: a 1-D array with all possible thresholds along feature j
        '''
        test = np.array(node.features)
        sorted_feature = np.sort(np.unique(node.features[:,j]))
        thresholds = []
        for i in range(len(sorted_feature) - 1):
            threshold = (sorted_feature[i] + sorted_feature[i+1]) / 2  # Compute the midpoint between adjacent elements
            thresholds.append(threshold)
        #... # your code here
        return thresholds
        # raise NotImplementedError("find_thresholds(): remove this exception after adding your code above.")
        
    def make_children(self, node, j, t):
        ''' execute the split in feature j at threshold t
        
            return: left, right -- the children of the split, with features and responses
                                   properly assigned according to the split
        '''
        left = Node()
        right = Node()
        
        left_indices = node.features[:, j] <= t
        right_indices = node.features[:, j] > t
                
        left.features = node.features[left_indices]
        right.features = node.features[right_indices]
                
        left.responses = node.responses[left_indices]
        right.responses = node.responses[right_indices]
        # ... # your code here
        # raise NotImplementedError("make_children(): remove this exception after adding your code above.")
        
        return left, right
        
    @abstractmethod
    def make_leaf_node(self, node):
        ''' Turn node into a leaf by computing and setting `node.prediction`
        
            (must be implemented in a subclass)
        '''
        raise NotImplementedError("make_leaf_node() must be implemented in a subclass.")
        
    @abstractmethod
    def compute_loss_for_split(self, node, j, t):
        ''' Return the resulting loss when the data are split along feature j at threshold t.
            If the split is not admissible, return float('inf').
        
            (must be implemented in a subclass)
        '''
        raise NotImplementedError("compute_loss_for_split() must be implemented in a subclass.")


# Regression Tree

In [229]:
class RegressionTree(Tree):
    def __init__(self, n_min=10):
        super(RegressionTree, self).__init__(n_min)
        
    def compute_loss_for_split(self, node, j, t):
        # return the loss if we would split the instance along feature j at threshold t
        # or float('inf') if there is no feasible split
        left_indices = node.features[:, j] <= t
        right_indices = node.features[:, j] > t
        
        if len(left_indices) < self.n_min or len(right_indices) < self.n_min:
            return float("inf")

        left_responses = node.responses[left_indices]
        right_responses = node.responses[right_indices]

        left_mean = np.mean(left_responses)
        right_mean = np.mean(right_responses)
                
        left_sse = np.sum((left_responses - left_mean) ** 2)
        right_sse = np.sum((right_responses - right_mean) ** 2)
        total_sse = left_sse + right_sse

        return total_sse
        # ... # your code here
        # raise NotImplementedError("compute_loss_for_split(): remove this exception after adding your code above.")
        
    def make_leaf_node(self, node):
        # turn node into a leaf node by computing `node.prediction`
        # (note: the prediction of a regression tree is a real number)
        unique, counts = np.unique(node.responses, return_counts=True)
        # Find the index of the element with the maximum count
        max_count_index = np.argmax(counts)
        # Get the element with the maximum count
        predicted_element = unique[max_count_index]
        # since it apparently has to be a real number, remask the elements
        predicted_element = 3 if predicted_element == 1 else 9
        node.prediction = predicted_element 
        # ... # your code here
        # raise NotImplementedError("make_leaf_node(): remove this exception after adding your code above.")


# Classification Tree

In [271]:
class ClassificationTree(Tree):
    '''implement classification tree so that it can handle arbitrary many classes
    '''
    
    def __init__(self, classes, n_min=10):
        ''' classes: a 1-D array with the permitted class labels
            n_min: minimum required number of instances in leaf nodes
        '''
        super(ClassificationTree, self).__init__(n_min)
        self.classes = classes
        
    def compute_loss_for_split(self, node, j, t):
        # Return the loss if we would split the instances along feature j at threshold t,
        # or float('inf') if there is no feasible split.
        
        left_indices = node.features[:, j] <= t
        right_indices = node.features[:, j] > t
        
        if len(left_indices) < self.n_min or len(right_indices) < self.n_min:
            return float('inf')
        
        left_responses = node.responses[left_indices]
        right_responses = node.responses[right_indices]
                
        # Calculate the Gini impurity for the left and right subsets
        left_gini = self.calculate_gini_impurity(left_responses)
        right_gini = self.calculate_gini_impurity(right_responses)
        total_gini = (len(left_indices)) * left_gini + (len(right_indices)) * right_gini
        
        return total_gini
        # ... # your code here
        # raise NotImplementedError("compute_loss_for_split(): remove this exception after adding your code above.")
        
    def calculate_gini_impurity(self, responses):
        # Calculate the Gini impurity for each subnode and all classes
        
        _, counts = np.unique(responses, return_counts=True)
        probabilities = counts / len(responses)
        gini_impurity = 1.0 - np.sum(probabilities ** 2)
        
        return gini_impurity
        
    def make_leaf_node(self, node):
        # turn node into a leaf node by computing `node.prediction`
        # (note: the prediction of a classification tree is a class label)
        unique_classes, counts = np.unique(node.responses, return_counts=True)

        # Filter the unique classes based on the permitted classes
        filtered_classes = np.intersect1d(unique_classes, self.classes)

        if len(filtered_classes) > 0:
            predicted_class_index = np.argmax(counts[unique_classes == filtered_classes])
            predicted_class = filtered_classes[predicted_class_index]
        else:
            # If no permitted class is present in the node's responses, select the most frequent class
            # no other useful use of self.classes came into my mind...
            predicted_class_index = np.argmax(counts)
            predicted_class = unique_classes[predicted_class_index]

        node.prediction = predicted_class
        # ... # your code here
        # raise NotImplementedError("make_leaf_node(): remove this exception after adding your code above.")
        

# Evaluation of Regression and Classification Tree

In [7]:
# read and prepare the digits data and extract 3s and 9s
digits = load_digits()
print(digits.data.shape, digits.target.shape)

instances = (digits.target == 3) | (digits.target == 9)
features = digits.data[instances, :]
labels = digits.target[instances]

# for regression, we use labels +1 and -1
responses = np.array([1 if l == 3 else -1 for l in labels])

assert(features.shape[0] == labels.shape[0] == responses.shape[0])

(1797, 64) (1797,)


In [325]:
# perform 5-fold cross-validation (see ex01) with responses +1 and -1 (for 3s and 9s)
# using RegressionTree()
# and comment on your results
# ... # your code here
from sklearn.model_selection import KFold

ratio = 0.75
length = features.shape[0]
idx = round(ratio*length)

X_train, y_train = features[:idx], responses[:idx]
X_test, y_test = features[idx:], responses[idx:]


def regression_cross_validation(X_train, y_train, n_mins):
    for n_min in n_mins:
        n_folds = 5
        k_folds = KFold(n_splits=n_folds)
        mean_rate = np.zeros(len(n_mins))
        for i, (train, val) in enumerate(k_folds.split(X_train, y_train)):
            train_X, train_y = features[train], responses[train]
            X_val, y_val = features[val], responses[val]
            RegTree = RegressionTree(n_min=n_min)
            RegTree.train(train_X, train_y)
            y_preds = []
            for sample in range(X_val.shape[0]):
                y_preds.append(RegTree.predict(X_val[sample]))
            # remask the ground truth responses, as the regression tree shall produce real numbers as predictions
            y_val = np.array([3 if y == 1 else 9 for y in y_val])
            mean_rate[i-1] = np.mean(y_preds != y_val)
        print(f"The mean error rate for training with n_min of {n_min}: {np.mean(mean_rate)}, standard deviation: {np.std(mean_rate)}")

regression_cross_validation(X_train, y_train, [5, 10, 20, 50])

The mean error rate for training with n_min of 5: 0.0967171717171717, standard deviation: 0.039928014418550185
The mean error rate for training with n_min of 10: 0.1287037037037037, standard deviation: 0.04576547539354592
The mean error rate for training with n_min of 20: 0.14267676767676768, standard deviation: 0.0570453123282618
The mean error rate for training with n_min of 50: 0.17112794612794613, standard deviation: 0.1011246194268189


best result for n_min of 5, overall the error rate is decent but not as low as I wished for. Compared to the sample solution in ex1 for the same task with svm's, this has a similar error rate. Trainig time is very fast.

In [326]:
n_min = [5]
RegTree = RegressionTree(n_min[0])
RegTree.train(X_train, y_train, n_min)
y_preds = []
for sample in range(X_test.shape[0]):
    y_preds.append(RegTree.predict(X_test[sample]))
y_test = np.array([3 if y == 1 else 9 for y in y_test])
print(f"The test error with n_min of {n_min[0]}: {np.mean(y_preds != y_test)}")

The test error with n_min of 5: 0.13186813186813187


#### Classification Tree

In [328]:
# perform 5-fold cross-validation with labels 3 and 9
# using ClassificationTree(classes=np.unique(labels))
# and comment on your results

# perform 5-fold cross-validation (see ex01) with responses +1 and -1 (for 3s and 9s)
# using RegressionTree()
# and comment on your results
# ... # your code here
from sklearn.model_selection import KFold

ratio = 0.75
length = features.shape[0]
idx = round(ratio*length)
classes = np.unique(labels)

X_train, y_train = features[:idx], labels[:idx]
X_test, y_test = features[idx:], labels[idx:]


def classification_cross_validation(X_train, y_train, n_mins):
    for n_min in n_mins:
        n_folds = 5
        k_folds = KFold(n_splits=n_folds)
        mean_rate = np.zeros(len(n_mins))
        for i, (train, val) in enumerate(k_folds.split(X_train, y_train)):
            train_X, train_y = features[train], labels[train]
            X_val, y_val = features[val], labels[val]
            ClassTree = ClassificationTree(n_min=n_min, classes=classes)
            ClassTree.train(train_X, train_y)
            y_preds = []
            for sample in range(X_val.shape[0]):
                y_preds.append(ClassTree.predict(X_val[sample]))
            mean_rate[i-1] = np.mean(y_preds != y_val)
        print(f"The mean error rate for training with n_min of {n_min}: {np.mean(mean_rate)}, standard deviation: {np.std(mean_rate)}")

classification_cross_validation(X_train, y_train, [5, 10, 20, 50])

The mean error rate for training with n_min of 5: 0.152020202020202, standard deviation: 0.02365125146256181
The mean error rate for training with n_min of 10: 0.12937710437710437, standard deviation: 0.0476096984690765
The mean error rate for training with n_min of 20: 0.06927609427609427, standard deviation: 0.038039967591057385
The mean error rate for training with n_min of 50: 0.1516835016835017, standard deviation: 0.04947118007320392


best result for n_min = 20. Decent error rate especially for n_min = 20. The training time is a bit longer than for the regressiontree, which might be due to my implementation or due to the fact that it took more calculation steps. But still it was a fast training time with < 10 seconds 

In [329]:
n_min = [20]
ClassTree = ClassificationTree(n_min=n_min[0], classes = classes)
ClassTree.train(X_train, y_train, n_min)
y_preds = []
for sample in range(X_test.shape[0]):
    y_preds.append(ClassTree.predict(X_test[sample]))
# y_test = np.array([3 if y == 1 else 9 for y in y_test])
print(f"The test error with n_min of {n_min[0]}: {np.mean(y_preds != y_test)}")

The test error with n_min of 20: 0.07692307692307693


# Regression and Classification Forest

In [9]:
def bootstrap_sampling(features, responses):
    '''return a bootstrap sample of features and responses
    '''

    # length of dataset
    N = responses.shape[0]
    indices = np.random.randint(low=0, high=N, size=N)
    # features and responses can be selected more than once 
    return features[indices], responses[indices]

In [11]:
bootstrap_features, bootstrap_responses = bootstrap_sampling(digits.data, digits.target)

print(bootstrap_features.shape, bootstrap_responses.shape)

(1797, 64) (1797,)


In [None]:
class RegressionForest():
    def __init__(self, n_trees, n_min=10):
        # create ensemble
        self.trees = [RegressionTree(n_min) for i in range(n_trees)]
    
    def train(self, features, responses):
        for tree in self.trees:
            boostrap_features, bootstrap_responses = bootstrap_sampling(features, responses)
            tree.train(boostrap_features, bootstrap_responses)

    def predict(self, x):
        # compute the response of the ensemble from the individual responses and return it
        responses = np.array([tree.predict(x=x) for tree in self.trees])
        # ensemble response regression: average over all tree responses
        return np.mean(responses)
        


In [None]:
class ClassificationForest():
    def __init__(self, n_trees, classes, n_min=1):
        self.trees = [ClassificationTree(classes, n_min) for i in range(n_trees)]
        self.classes = classes
    
    def train(self, features, labels):
        for tree in self.trees:
            boostrap_features, bootstrap_responses = bootstrap_sampling(features, responses)
            tree.train(boostrap_features, bootstrap_responses)

    def predict(self, x):
        # compute the response of the ensemble from the individual responses and return it
        responses = np.array([tree.predict(x=x) for tree in self.trees])
        # count response for each unique class
        unique, counts = np.unique(responses, return_counts=True)
        # return class with most counts
        return unique[np.argmax(counts)]

# Evaluation of Regression and Decision Forest

In [None]:
# perform 5-fold cross-validation (see ex01) with responses +1 and -1 (for 3s and 9s)
# using RegressionForest(n_trees=10)
# and comment on your results
... # your code here

In [None]:
# perform 5-fold cross-validation with labels 3 and 9
# using DecisionForest(n_trees=10, classes=np.unique(labels))
# and comment on your results
... # your code here

# Multi-class Classification Forest

In [None]:
# Train DecisionForest(n_trees=10, classes=np.unique(digits.target))
# for all 10 digits simultaneously.
# Compute and plot the confusion matrix after 5-fold cross-validation and comment on your results.
... # your code here

# One-against-the-rest classification with RegressionForest

In [None]:
# Train ten one-against-the-rest regression forests for the 10 digits.
# Make sure that all training sets are balanced between the current digit and the rest.
# Assign test instances to the digit with highest score, 
# or to "unknown" if all scores are negative.
# Compute and plot the confusion matrix after 5-fold cross-validation and comment on your results.
... # your code here