In [None]:
import numpy as np
import matplotlib.pyplot as plt
from numpy import argmax
from numpy.random import default_rng

# Step 1: Loading Data #

The code cell below contains code for reading in both the clean and noisy datasets (assumed to be stored in a local folder), and splitting these into training, testing, and validation sets. To load an unseen dataset, we have provided a commented out line of code, to which you need only pass in the local file path as an argument.

In [None]:

def split_dataset(data, test_proportion, val_proportion, random_generator=default_rng()):
    """ Split dataset into training, test, and validation sets according to the given 
        proportions.
    
    Args:
        data (numpy.ndarray): The datatset that is to be split, with shape (N,8)
        test_proprotion (float): the desired proportion of test examples.
                                 (0.0-1.0)
        val_proprotion (float): the desired proportion of validation examples. 
                                 (0.0-1.0)
        random_generator (np.random.Generator): A random generator.

    Returns:
        tuple: returns a tuple of (training_data, test_data, validation_data) 
               - training_data (np.ndarray): Training instances shape (n_train, 8)
               - test_data (np.ndarray): Test instances shape (n_test, 8)
               - validation_data (np.ndarray): Validation instances shape (n_val, 8)
    """

    # Shuffle indices and split into train, validation, and test.
    shuffled_indices = random_generator.permutation(len(data))
    n_test = round(len(data) * test_proportion)
    n_val = round(len(data) * val_proportion)
    n_train = len(data) - (n_test + n_val)

    # Split the actual data using the shuffled indices.
    training_data = data[shuffled_indices[:n_train]]
    test_data = data[shuffled_indices[n_train : n_train + n_test]]
    validation_data = data[shuffled_indices[n_train + n_test : ]]

    return (training_data, test_data, validation_data)


# Load datasets from local machine.
clean_data = np.loadtxt("wifi_db/clean_dataset.txt", dtype=int)
noisy_data = np.loadtxt("wifi_db/noisy_dataset.txt", dtype=float)

# To load unseen data ...
# unseen_data = np.loadtxt("wifi_db/...", dtype=float)

# Set default rng.
rg = default_rng()


# Step 2: Creating Decision Trees (and pruning) #

The section that follows contains code to implement and test a decision tree on both the noisy and clean datasets.

### 2.1 Implementation ###

The code cell that follows contains the implementation of a decision tree class (with pruning methods) defined over a recursively constructed set of interlinked nodes.

In [None]:

class Node:
    """ Class representing a node data structure.

    Attributes:
        left (Node): Object reference to this node's left child.
        right (Node): Object reference to this node's right child.
        attribute (Node): The attribute index to be tested on (0-6).
        value (float): 
                Decision Node: The value at the indexed attribute, to be compared with the split point.
                Leaf Node:     The label of any feature vector for which the tree path leads to this leaf node. 

        leaf (Boolean): Specifies whether the node is a leaf node (True) or not (False) 
        label_count (int) : The number of training examples at a leaf node (always 0 for a decision node)

    """

    def __init__(self, left=None, right=None, attribute=None, value=None, leaf=None):
        """ Constructor 
        Args:
            left (Node): Object reference to this node's left child.
            right (Node): Object reference to this node's right child.
            attribute (Node): The attribute index to be tested on (0-6).
            value (float): 
                Decision Node: The value at the indexed attribute, to be compared with the split point.
                Leaf Node:     The label of any feature vector for which the tree path leads to this leaf node. 

            leaf (Boolean): Specifies whether the node is a leaf node (True) or not (False)

        """

        self.left = left
        self.right = right
        self.attribute = attribute
        self.value = value
        self.leaf = leaf
        self.label_count = 0
    
    # Getters -------------------------------------------------------------------------------
    def get_left(self):
        return self.left

    def get_label_count(self):
        return self.label_count
    
    def get_right(self):
        return self.right
    
    def get_attribute(self):
        return self.attribute
    
    def get_value(self):
        return self.value
    
    def is_leaf(self):
        return self.leaf
    
    #----------------------------------------------------------------------------------------

    # Setters -------------------------------------------------------------------------------
    def set_left(self,left):
        self.left = left
    
    def set_right(self,right):
        self.right = right
    
    def set_attribute(self,attribute):
        self.attribute = attribute
    
    def set_value(self,value):
        self.value = value
    
    def set_leaf(self,leaf):
        self.leaf = leaf

    def set_label_count(self, label_count):
        self.label_count = label_count

    #----------------------------------------------------------------------------------------



class DecisionTree:
    """  Class representing a decision tree classifier.

     Attributes:
        unique_labels (array): Array of unique labels in the datatset.
        root (Node): The root node of the tree (which contains an object reference to its children).
        depth (int): Maximum path length from root to a leaf.
        validation(numpy.ndarray): The dataset that is to be used to evaluate pruning performance, with shape (N,8).
        
    """

    def __init__(self, dataset, validation):
        """ Constructor 

            Args:
                dataset (numpy.ndarray): The datatset that is to be fit with a decision tree, with shape (N,8).
                validation(numpy.ndarray): The dataset that is to be used to evaluate pruning performance, with shape (N,8).
        """
        self.validation = validation
        self.unique_labels = self.get_labels(dataset)
        self.root, self.depth = self.decision_tree_learning(dataset, 0)
        
    
    def get_labels(self, dataset):
        """ Creates a list of the unique labels and assigns them to the unique_labels attribute.

            Args:
                dataset (numpy.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).
        """
        return np.unique(dataset[:,-1:])
    

    def predict(self, samples):
        """ Traverses the already built decision tree in order to make a classification label for a set of samples, relying on predict_single().

            Args:
                sample (numpy.ndarray): A dataset of unseen samples to be classified, with shape (N,8).

             Returns:
                labels (numpy.ndarray): Array containing label prediction of each sample, with shape (N,).
        """
        return np.asarray([self.predict_single(self.root, sample) for sample in samples])


    def predict_single(self, node, sample):
        """ Traverses the already built decision tree in order to make a classification label for a single data sample, using recursion.

            Args:
                node (Node): Object reference to the current node in the decision tree.
                sample (numpy.ndarray): A single data sample (row) to be classified, with shape (1,8).

             Returns:
                label (int): The predicted label of our sample.
        """

        # Base case.
        if node.is_leaf():
            return node.get_value()

        # Recursive case. Find the attribute we are splitting on and get the value.
        node_attribute = node.get_attribute()
        node_value = node.get_value()
        sample_value = sample[node_attribute]
        
        # Compare.
        if sample_value <= node_value:
            return self.predict_single(node.get_left(), sample)
        else:
            return self.predict_single(node.get_right(), sample)


    def decision_tree_learning(self, dataset, depth):
        """ This function fits a binary decision tree to dataset, using recursion.
        
        Args: 
            dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
            depth (int): Maximum path length from root to a leaf. Always initialised to 0.
        
        Returns:
            root (Node): The root node of the tree (which contains an object reference to its children).
            depth (int): Maximum path length from root to a leaf.
        """

        # 1. Base Case: All dataset samples have the same label. 
        if self.have_same_label(dataset):  
            return self.create_leaf_node(dataset), depth
        
        # Stated assumption that there are no inconsistent data points 
        # i.e. identical feature vectors having different classes, thus no need for second base case.

        # 2. Inductive case: Further splitting to do.
        split_attribute, split_value = self.FIND_SPLIT(dataset)
        
        # Instantiate node.
        new_node = Node()    
        new_node.set_value(split_value)
        new_node.set_attribute(split_attribute)
         
        left_dataset, right_dataset = self.split_dataset(dataset, split_attribute, split_value)

        # Recursive step (depth-first fashion).
        l_branch, l_depth = self.decision_tree_learning(left_dataset, depth+1)
        r_branch, r_depth = self.decision_tree_learning(right_dataset, depth+1)

        new_node.set_left(l_branch)
        new_node.set_right(r_branch)
        depth = max(l_depth, r_depth)

        return new_node, depth

    
    def shared_label(self, dataset):
        """ Returns the label shared by every sample in dataset. 
            
            *Function is called iff have_same_label() returns True.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).

            Returns:
                label (float): The label shared by all samples in dataset.
        """
        label = dataset[0,-1]
        return label


    def have_same_label(self, dataset):
        """ Checks if all samples in dataset share the same label.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).

            Returns:
                shared (Boolean): True  = all samples in dataset share the same label
                                  False = variety of labels **(meaning it can be split further)

        """
        labels = dataset[:, -1:]
        shared = np.all(labels == labels[0])
        return shared


    def create_leaf_node(self, dataset_at_node):
        """ Creates a leaf node with `value` set as the label, and determines the number of training samples that reach this leaf node.
            Args: 
                dataset_at_node (numpy.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).
            Returns:
                node (Node): A leaf node that is to be appended to the node path.
        """
        value = self.shared_label(dataset_at_node)
        label_count = len(dataset_at_node) 

        # instantiate and set values
        node = Node()
        node.set_leaf(True)
        node.set_value(value)
        node.set_label_count(label_count)
        return node


    def FIND_SPLIT(self, dataset):
        """ Chooses the attribute and the splitting value that results in the highest information gain.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).

            Returns:
                Tuple:
                    attribute (int): The index of the optimal attribute to be split at.
                    value: (float) :  The decision boundary of the attribute, which will split the incoming data.
        """
        # (info_gain, attribute, split_value)
        max_info_gain = (0, 0, 0) 

        # -1 so we don't loop over labels.
        for col in range(dataset.shape[1] - 1): 
            sorted_by_col = dataset[dataset[:, col].argsort()] # Ascending order.

            # Loop over rows of specific attribute (column) being evaluated.
            for row in range(sorted_by_col.shape[0] - 1): 

                # Determine information gain for a trialled split value.
                split_value = (sorted_by_col[row,col] + sorted_by_col[row+1, col]) / 2 # Midpoint
                left_branch, right_branch = self.split_dataset(dataset, col, split_value)
                info_gain = self.information_gain(dataset, left_branch, right_branch)

                # Update maximal 
                if info_gain > max_info_gain[0]:
                    max_info_gain = (info_gain, col, split_value)

        # Select splitting attribute + splitting value.
        attribute = max_info_gain[1]
        value = max_info_gain[2]
        return (attribute, value)


    def entropy(self,dataset):
        """ Returns the entropy of a given dataset.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).

            Returns:
                entropy (float): The calculated entropy of the dataset.
        """
        sum = 0
        for label in self.unique_labels:
            proportion = self.p_label(dataset, label)
            log_of_proportion = 0 if proportion == 0 else np.log2(proportion)  # Handle edge case.
            sum += (proportion * log_of_proportion)

        return -sum

    def p_label(self, dataset, label):
        """ Returns the proportion of the dataset that contains labels that are equal to a given label parameter.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).
                label (int): The label we are evaluating the proportion of.

            Returns:
                entropy (float): The calculated entropy of the dataset. 

        """
        # Edge case: empty dataset.
        return 0 if len(dataset) == 0 else sum([int(row[-1]) == int(label) for row in dataset]) / len(dataset)


    def information_gain(self, dataset, left, right):
        """ Returns the information gain from a given split of the dataset.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).
                left (np.ndarray): The left split of dataset.
                right (np.ndarray): The right split of dataset.

            Returns:
                information gain (float): The calculated information gain.

        """

        return self.entropy(dataset) - self.remainder(left,right)

    def remainder(self, left, right):
        """ Returns the entropy remaining after a given split. The information gain function subtracts this value 
            from the overall entropy in order to calculate the information gained. 

            Args: 
                left (np.ndarray): The left split of the dataset.
                right (np.ndarray): The right split of the dataset.

            Returns:
                remainder (float): The calculated entropy remainder.
                
        """
        
        # Components.
        size_left = left.shape[0]
        size_right = right.shape[0]
        entropy_left = self.entropy(left)
        entropy_right = self.entropy(right)

        return ((size_left / (size_left + size_right)) * entropy_left) + ((size_right / (size_left + size_right))* entropy_right)

    def split_dataset(self, dataset, attribute, split_value):
        """ Splits the dataset into left + right datasets, from a given split value on a specific attribute.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
                attribute (int): The column index in dataset, of the specific attribute that will be split over.
                split_value (float): The value that each row of data will compared with. 
                                     When the value of the row indexed at attribute <= split value, that row is moved into the left branch.
                                     When the value of the row indexed at attribute > split value, that row is moved into the right branch.
            Returns:
                left (np.ndarray): The left split of the dataset.
                right (np.ndarray): The right split of the dataset.
                
        """

        # Implementational design to go left when equal to split_value.
        left = dataset[((dataset[:,attribute] <= split_value))]
        right = dataset[((dataset[:,attribute] > split_value))] 

        return left, right


    def parents_of_leaves(self, node, parents):
        """ Generates a list of all nodes in tree that are parents of two leaves.

            Args: 
                node (Node): Object reference to the current node in the decision tree.
                parents (array): The list of parent nodes, initialised to an empty list.

            The function does not return the parents list, it updates the parents list.
                
        """

        # 1. Base case. 
        if node.is_leaf():
            return

        # 2. Base case.
        if node.get_left().is_leaf() and node.get_right().is_leaf():
            parents.append(node) # Parent found
            return
        
        # 3. Inductive Case (check children)
        self.parents_of_leaves(node.get_left(), parents) 
        self.parents_of_leaves(node.get_right(), parents)
        return 


    def majority(self, node):
        """ Returns the majority classification label of a sub-tree rooted at `node`,
            where the given node is a parent of two leaf nodes.

            Args: 
                node (Node): Object reference to the current node in the decision tree.

            Returns:
                value (int): The majority class label of node's children.
                node: (Node): The object reference to the child with the majority vote.

        """

        left = node.get_left()
        right = node.get_right()

        # Number of training samples.
        left_label_counts = left.get_label_count()
        right_label_counts = right.get_label_count()

        # Pick majority.
        if left_label_counts >= right_label_counts:
            return left.get_value(), left
        else:
            return right.get_value(), right



    def evaluate(self, test_db, trained_tree):
        """ Computes the accuracy of an already trained decision tree on the validation/test data set.

            Args: 
                test_db (numpy.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8).

            Returns:
                accuracy (float): The accuracy of the decision tree on the given datset.
                                  (0.0-1.0)

        """

        # We are inside decision tree, so we don't explicitly use trained_tree, but include to be consistent with the specification.
        y_predictions = self.predict(test_db)
        y_gold = test_db[:,-1]

        correct = sum([y_predictions[i] == y_gold[i] for i in range(len(test_db))])
        accuracy = correct / len(test_db)
        return accuracy


    def prune(self):
        """ Attempts to prune the already built decision tree 
        (we say attempts because pruning may not improve performance, in which case it will not be applied).

            This function takes no parameters and does not return anything. Rather it modifies the current decision tree object.
        """

        # Compute the error before pruning.
        prev_error = (1 - self.evaluate(self.validation, self))

        parents_of_leaves = []
        self.parents_of_leaves(self.root, parents_of_leaves)
        
        for parent in parents_of_leaves:

            m_label, m_node = self.majority(parent)

            # Backup value (in case prune needs to be undone).
            old_value = parent.get_value()

            # Prune tree.
            parent.set_leaf(True)
            parent.set_value(m_label)

            # Test performance of pruned tree.
            new_error = (1 - self.evaluate(self.validation, self))

            if new_error <= prev_error:                     
                # Success. Set children to null and try pruning again.
                parent.set_left(None)
                parent.set_right(None)
                parent.set_label_count(m_node.get_label_count())
                self.prune()
                return
            else:
                # Revert to previous tree.
                parent.set_leaf(False)
                parent.set_value(old_value)
                
        self.update_tree_depth()
        return


    def update_tree_depth(self):
        """" Updates the depth of this decision tree object by calling get_tree_depth() on the root node.
             This function takes no parameters and does not return anything.
        """

        self.depth = self.get_depth_at_node(self.root)
        return
    

    def get_tree_depth(self):
        """ Returns the depth of this decision tree (the longest path from root to leaf).
            This function takes no arguments.
            
            Returns:
                depth (int): The depth of the tree.
        """
        return self.depth


    def get_depth_at_node(self, node):
        """ Computes the depth the of the tree. Assumes root node as
            initial input.

            Args:
                node (Node): Initially the root node of the decision tree.

            Returns:
                depth (int): maximum path length from the root to a leaf.
        """
        if node.is_leaf():
            return 0
        else:
            l_depth = self.get_depth_at_node(node.get_left())
            r_depth = self.get_depth_at_node(node.get_right())
            return max(l_depth, r_depth) + 1



### 2.1 Testing the implementation of the decision tree  ###

The two code cells that follow build and then test the decision tree on both the noisy and clean datasets. In both cases the tree is built, and then tested for accuracy and depth. This is simply to test that the model builds and works as intended.

In [None]:
# Pre pruning decision tree accuracy on the clean dataset.
clean_data_train, clean_data_test, clean_data_val = split_dataset(clean_data, test_proportion=0.1, val_proportion=0,random_generator=rg)

decision_tree = DecisionTree(clean_data_train, None)
print("Test accuracy: {}".format(decision_tree.evaluate(clean_data_test, decision_tree)))
print("Depth: {}".format(decision_tree.get_tree_depth()))


In [None]:
# Pre pruning decision tree accuracy on the noisy dataset.
noisy_data_train, noisy_data_test, noisy_data_val = split_dataset(noisy_data, test_proportion=0.1, val_proportion=0,random_generator=rg)

decision_tree = DecisionTree(noisy_data_train, None)
print("Test accuracy: {}".format(decision_tree.evaluate(noisy_data_test, decision_tree)))
print("Depth: {}".format(decision_tree.get_tree_depth()))


### 2.2 (optional): Visualizing the tree ###

Not implemented.

# Step 3: Evaluation

In the cell that follows we provide code for performing k-fold cross validation, nested k-fold cross validation and computing confusion matrices. These methods are then used in later sections and subsections to evaluate the decision tree performance.

In [None]:
 # All functions in this cell have been inspired by and adapted from the lab solutions.

def k_fold_split(k_folds, n_instances, random_generator=default_rng()):
   
    """ Split k_instances into n mutually exclusive splits at random.
    
    Args:
        k_folds (int): Number of folds.
        n_instances (int): Number of instances to split.
        random_generator (np.random.Generator): A random generator.

    Returns:
        split_indices: a list (length k_folds). Each element in the list should contain a 
                       numpy array giving the indices of the instances in that split.
    """

    # Generate a random permutation of indices from 0 to n_instances.
    shuffled_indices = random_generator.permutation(n_instances)

    # Split shuffled indices into almost equal sized folds.
    split_indices = np.array_split(shuffled_indices, k_folds)

    return split_indices


def nested_k_fold_cross_val(k_folds, n_instances, random_generator=default_rng()):
    """ Generate train, test, and validation indices at each fold.
    
    Args:
        k_folds (int): Number of folds.
        n_instances (int): Total number of instances.
        random_generator (np.random.Generator): A random generator.

    Returns:
        folds: a list of length k_folds. Each element in the list is a 
               list with three elements: 
                - train_indices (numpy.ndarray): The randomly shuffled training indices.
                - test_indices (numpy.ndarray): The randomly shuffled test indices.
                - val_indices (numpy.ndarray): The randomly shuffled val indices.

    """

    # Split the dataset into k folds.
    split_indices = k_fold_split(k_folds, n_instances, random_generator)

    folds = []
    for k in range(k_folds):
        # Single test + training fold.
        test_indices = split_indices[k]
        # Train_indices = np.hstack(split_indices[:k] + split_indices[k+1:]).
        remaining_training_blocks = set(range(k_folds)) - {k}

        for i in remaining_training_blocks: 
            # Single validation fold.
            val_indices = split_indices[i]
            
            # Remaining data assigned as training data.
            train_indices = []
            for j in (remaining_training_blocks - {i}):
                train_indices.append(split_indices[j])
            train_indices = np.array(train_indices, dtype=int).flatten()

            folds.append([train_indices, np.array(test_indices), np.array(val_indices)])

    return folds


def k_fold_cross_val(k_folds, n_instances, random_generator=default_rng()):
    """ Generate train, test and validation indices at each fold.
    
    Args:
        k_folds (int): Number of folds.
        n_instances (int): Total number of instances.
        random_generator (np.random.Generator): A random generator.

    Returns:
        folds: a list of length k_folds. Each element in the list is a 
               list with three elements: 
                - train_indices (numpy.ndarray): The randomly shuffled training indices.
                - test_indices (numpy.ndarray): The randomly shuffled test indices.
                - val_indices (numpy.ndarray): The randomly shuffled val indices.

    """

    # Split the dataset into k splits.
    split_indices = k_fold_split(k_folds, n_instances, random_generator)

    folds = []
    for k in range(k_folds):
        # Pick k as test.
        test_indices = split_indices[k]

        # Combine remaining splits as train.
        train_indices = np.hstack(split_indices[:k] + split_indices[k+1:])

        folds.append([train_indices, test_indices])

    return folds


def confusion_matrix(y_gold, y_prediction, class_labels=None):
    """ Compute the confusion matrix.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels.
        y_prediction (np.ndarray): the predicted labels.
        class_labels (np.ndarray): a list of unique class labels. 
                               Defaults to the union of y_gold and y_prediction.

    Returns:
        confusion (np.array): The confusion matrix with shape (C, C), where C is the number of classes. 
                              Rows are ground truth per class, columns are predictions.
    """

    # If no class_labels are given, we obtain the set of unique class labels from 
    # the union of the ground truth annotation and the prediction.
    if not class_labels:
        class_labels = np.unique(np.concatenate((y_gold, y_prediction)))

    confusion = np.zeros((len(class_labels), len(class_labels)), dtype=int)

    # For each correct class (row), 
    # compute how many instances are predicted for each class (columns).
    for (i, label) in enumerate(class_labels):
        
        # Get predictions where the ground truth is the current class label.
        indices = (y_gold == label)
        gold = y_gold[indices]
        predictions = y_prediction[indices]

        # Quick way to get the counts per label.
        (unique_labels, counts) = np.unique(predictions, return_counts=True)

        # Convert the counts to a dictionary.
        frequency_dict = dict(zip(unique_labels, counts))

        # Fill up the confusion matrix for the current row.
        for (j, class_label) in enumerate(class_labels):
            confusion[i, j] = frequency_dict.get(class_label, 0)

    return confusion


## 3.1 Evaluate clean and noisy data sets with k-fold-cross validation ##

In the two cells that follow we provide code for computing confusion matrices and tree depth over the clean and noisy dataset using k-fold cross validation (with 10 folds). These are printed out in section 3.2.

In [None]:

def k_fold_evaluation(data, k_folds):
    """ Performs k-fold cross validation on a provided dataset and computes the accuracies and confusion matrices at every fold, before and after pruning the tree.
        
    Args:
        data (np.ndarray):  The datatset that is to be evaluated, with shape (N,8).
        k_folds (int): The number of folds.

    Returns:
        confusions (list): A list where the elements are the individual confusion matrices of type (numpy.ndarray), at every fold.
        depths (list): A list of integers, corresponding to the depth of each fold.

    """

    # Initialise as empty.
    confusions = []
    depths = []
    
    for i, (train_indices, test_indices) in enumerate(k_fold_cross_val(k_folds, len(data), rg)):
        print("fold {}".format(i)) # Check progress.
        
        # Compute accuracy + confusion matrix before pruning.
        decision_tree = DecisionTree(data[train_indices], None)
        predictions = decision_tree.predict(data[test_indices])
        confusion = confusion_matrix(data[test_indices][:,-1], predictions)
        depth = decision_tree.get_tree_depth()

        # Update accuracy list + confusion matrix list (before and after).
        confusions.append(confusion)
        depths.append(depth)
        
    return confusions, depths



In [None]:
# Compute depth and confusion matrices for clean and noisy datasets over 10 folds.
pre_confusions_clean, pre_depths_clean = k_fold_evaluation(clean_data, 10)
pre_confusions_noisy, pre_depths_noisy = k_fold_evaluation(noisy_data, 10)

## 3.2 Cross validation classification metrics ##

In the cell that follows we define code to compute the mean confusion matrix after cross validation. From this, we provide code to compute the accuracy, precision and recall. These are used in sections 3.2.1 and 3.2.1 to evaluate the performance of the tree.

In [None]:
from numpy import average

def compute_metrics(confusions):
    """" Computes the mean confusion matrix, accuracy, precision and recall
         from a list of confusion matrices (obtained from cross validation).

        Args: 
            confusions (list): A list of confusion matrices.

         Returns:
            mean_confusion (numpy.ndarray): The element-wise mean confusion matrix of the classifier, with shape (N,N) where N is number of class labels.
            accuracy (float): The accuracy of the classifier.
            precision (list): A list of precisions per label of the data.
            recall (list): A list of recalls per label of the data.
            f1_score (list): A list of f1 scores per label of the data.
    """
    mean_confusion = np.mean(confusions, axis = 0)
    accuracy = accuracy_from_confusion(mean_confusion)
    precision = precision_from_confusion(mean_confusion)
    recall = recall_from_confusion(mean_confusion)
    f1_score = f1_score_from_confusion(mean_confusion)
    return (mean_confusion, accuracy, precision, recall, f1_score)
   

def accuracy_from_confusion(confusion):
    """" Computes the accuracy of the classifier from the given confusion matrix.

         Args: 
            confusion (numpy.ndarray): A confusion matrix, with shape (N,N) where N is number of class labels.

         Returns:
            accuracy (float): The accuracy of the classifier.

    """
    if np.sum(confusion) > 0:
        return np.sum(np.diag(confusion)) / np.sum(confusion)
    else:
        return 0


def precision_from_confusion(confusion):
    """" Computes the precision of the classifier from the given confusion matrix.

        Args: 
             confusion (numpy.ndarray): A confusion matrix, with shape (N,N) where N is number of class labels.

         Returns:
            precision (list): A list of precisions per label of the data.
    """
    p = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[:, c]) > 0:
            p[c] = confusion[c, c] / np.sum(confusion[:, c])
    return p


def recall_from_confusion(confusion):
    """" Computes the recall of the classifier from the given confusion matrix.

        Args: 
            confusion (numpy.ndarray): A confusion matrix, with shape (N,N) where N is number of class labels.

         Returns:
            recall (list): A list of recalls per label of the data.
    """
    r = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[c, :]) > 0:
            r[c] = confusion[c, c] / np.sum(confusion[c, :])
    return r


def f1_score_from_confusion(confusion):
    """" Computes the f1 score of the classifier from the given confusion matrix.

         Args: 
            confusion (numpy.ndarray): A confusion matrix, with shape (N,N) where N is number of class labels.

         Returns:
            f1_score (list): A list of f1 scores per label of the data.
    """
    precisions = precision_from_confusion(confusion)
    recalls = recall_from_confusion(confusion)

    # Make sure they are of the same length.
    assert len(precisions) == len(recalls)

    f = np.zeros((len(precisions), ))
    for c, (p, r) in enumerate(zip(precisions, recalls)):
        if p + r > 0:
            f[c] = 2 * p * r / (p + r)
    return f




### 3.2.1 Clean data ###

In the cell that follows we print out the average depth and confusion matrix for an unpruned tree trained on the clean dataset using cross validation with 10 folds. 

In [None]:
average_clean_depth_pre = np.mean(pre_depths_clean)
mean_confusion, accuracy, precision, recall, f1_score = compute_metrics(pre_confusions_clean)
print("mean confusion: \n{}".format(mean_confusion))
print("\naccuracy: {}".format(accuracy))
print("\nprecision: {}".format(precision))
print("\nrecall: {}".format(recall))
print("\nf1 score: {}".format(f1_score))
print("\naverage depth of tree: {}".format(average_clean_depth_pre))


### 3.2.1 Noisy data ###

In the cell that follows we print out the average depth and confusion matrix for an unpruned tree trained on the noisy dataset using cross validation with 10 folds. 

In [None]:
average_noisy_depth_pre = np.mean(pre_depths_noisy)
mean_confusion, accuracy, precision, recall, f1_score = compute_metrics(pre_confusions_noisy)
print("mean confusion: \n{}".format(mean_confusion))
print("\naccuracy: {}".format(accuracy))
print("\nprecision: {}".format(precision))
print("\nrecall: {}".format(recall))
print("\nf1 score: {}".format(f1_score))
print("\naverage depth of tree: {}".format(average_noisy_depth_pre))

# Step 4: Pruning (implemented in 2.1)

The following section deals with tree evaluation post pruning. In particular, we perform nested k-fold cross validation and compare the mean confusion matrices, precision, recall, f1 score and depth.

## 4.1 Evaluate clean and noisy datsets after pruning with nested k-fold cross validation ##

The two code cells below compute the confusion matrices and depths over all 90 nested k-fold cross validaton sets.

In [None]:
def k_fold_nested_evaluation(data, k_folds):
    """ Performs k-fold cross validation on a provided dataset and computes the accuracies and confusion matrices at every fold, before and after pruning the tree.
        
    Args:
        data (np.ndarray):  The datatset that is to be evaluated, with shape (N,8).
        k_folds (int): The number of folds.

    Returns:
        confusions (list): A list where the elements are the individual confusion matrices of type (numpy.ndarray), at every fold.
        depths (list): A list of integers, corresponding to the depth of each fold.
        
    """

    # Initialise as empty.
    confusions = []
    depths = []
    
    for i, (train_indices, test_indices, val_indices) in enumerate(nested_k_fold_cross_val(k_folds, len(data), rg)):
        print("fold {}".format(i))
        
        # Compute accuracy + confusion matrix before pruning.
        decision_tree = DecisionTree(data[train_indices], data[val_indices])

        decision_tree.prune()
        
        predictions = decision_tree.predict(data[test_indices])
        confusion = confusion_matrix(data[test_indices][:,-1], predictions)
        depth = decision_tree.get_tree_depth()

        # Update accuracy list + confusion matrix list (before and after).
        confusions.append(confusion)
        depths.append(depth)
        
    return confusions, depths



In [None]:
pruned_confusions_clean, pruned_depths_clean = k_fold_nested_evaluation(clean_data, 10)

In [None]:
pruned_confusions_noisy, pruned_depths_noisy = k_fold_nested_evaluation(noisy_data, 10)

## 4.2 Cross Validation classification metrics

In the two subsections that follow we print out the evaluation metrics after performing nested k-fold cross validation on the clean and noisy datasets.

### 4.2.1 Clean data

The code cell below prints out the nested k-fold cross validation averages for the depth, confusion matrix, precision, recall and f1 score for the clean dataset after pruning the tree. 

In [None]:
# Clean
average_clean_depth_pruned = np.mean(pruned_depths_clean)
mean_confusion, accuracy, precision, recall, f1_score = compute_metrics(pruned_confusions_clean)
print("mean confusion: \n{}".format(mean_confusion))
print("\naccuracy: {}".format(accuracy))
print("\nprecision: {}".format(precision))
print("\nrecall: {}".format(recall))
print("\nf1 score: {}".format(f1_score))
print("\naverage depth of tree: {}".format(average_clean_depth_pruned))

### 4.1.2 Noisy data

The code cell below prints out the nested k-fold cross validation averages for the depth, confusion matrix, precision, recall and f1 score for the noisy dataset after pruning the tree. 

In [None]:
# Noisy
average_noisy_depth = np.mean(pruned_depths_noisy)
mean_confusion, accuracy, precision, recall, f1_score = compute_metrics(pruned_confusions_noisy)
print("mean confusion: \n{}".format(mean_confusion))
print("\naccuracy: {}".format(accuracy))
print("\nprecision: {}".format(precision))
print("\nrecall: {}".format(recall))
print("\nf1 score: {}".format(f1_score))
print("\naverage depth of tree: {}".format(average_noisy_depth))
