In [1]:
import numpy as np
import matplotlib.pyplot as plt
from numpy import argmax
from numpy.random import default_rng

# Step 1: Loading Data #

In [2]:

def split_dataset(data, test_proportion, val_proportion, random_generator=default_rng()):
    """ Split dataset into training and test sets, according to the given 
        test set proportion.
    
    Args:
        *******
        test_proprotion (float): the desired proportion of test examples 
                                 (0.0-1.0)
        random_generator (np.random.Generator): A random generator

    Returns:
        tuple: returns a tuple of (x_train, x_test, y_train, y_test) 
               - x_train (np.ndarray): Training instances shape (N_train, K)
               - x_test (np.ndarray): Test instances shape (N_test, K)
    """

    shuffled_indices = random_generator.permutation(len(data))
    n_test = round(len(data) * test_proportion)
    n_val = round(len(data) * val_proportion)
    n_train = len(data) - (n_test + n_val)

    training_data = data[shuffled_indices[:n_train]]
    test_data = data[shuffled_indices[n_train : n_train + n_test]]
    validation_data = data[shuffled_indices[n_train + n_test : ]]

    return (training_data, test_data, validation_data)


# load dataset from local machine 
clean_data = np.loadtxt("wifi_db/clean_dataset.txt", dtype=int)
noisy_data = np.loadtxt("wifi_db/noisy_dataset.txt", dtype=float)

seed = 60012
rg = default_rng(seed)

clean_data_train, clean_data_test, clean_data_val = split_dataset(clean_data, test_proportion=0.1, val_proportion=0.1,random_generator=rg)
noisy_data_train, noisy_data_test, noisy_data_val = split_dataset(noisy_data, test_proportion=0.1, val_proportion=0.1,random_generator=rg)


# Step 2: Creating Decision Trees #

In [31]:

class Node:
    """ Class representing a node data structure

    Attributes:
        left (Node): Object reference to this node's left child.
        right (Node): Object reference to this node's right child.
        attribute (Node): The attribute index to be tested on (0-7).
        value (float): 
                Decision Node: The value at the indexed attribute, to be compared with the split point
                Leaf Node:     The label of any feature vector, where the decision tree path leads to this leaf node. 
        leaf (Boolean): Specifies whether the node is a leaf node (True) or not (False) 

    """

    def __init__(self, left=None, right=None, attribute=None, value=None, leaf=None):
        """ Constructor 
        Args:
            left (Node): Object reference to this node's left child.
            right (Node): Object reference to this node's right child.
            attribute (Node): The attribute index to be tested on (0-7).
            value (float): 
                Decision Node: The value at the indexed attribute, to be compared with the split point
                Leaf Node:     The label of any feature vector, where the decision tree path leads to this leaf node. 
            leaf (Boolean): Specifies whether the node is a leaf node (True) or not (False)

        """

        self.left = left
        self.right = right
        self.attribute = attribute
        self.value = value
        self.leaf = leaf
        self.label_count = 0
    
    # Getters -------------------------------------------------------------------------------
    def get_left(self):
        return self.left

    def get_label_count(self):
        return self.label_count
    
    def get_right(self):
        return self.right
    
    def get_attribute(self):
        return self.attribute
    
    def get_value(self):
        return self.value
    
    def is_leaf(self):
        return self.leaf
    
    #----------------------------------------------------------------------------------------

    # Setters -------------------------------------------------------------------------------
    def set_left(self,left):
        self.left = left
    
    def set_right(self,right):
        self.right = right
    
    def set_attribute(self,attribute):
        self.attribute = attribute
    
    def set_value(self,value):
        self.value = value
    
    def set_leaf(self,leaf):
        self.leaf = leaf

    def set_label_count(self, label_count):
        self.label_count = label_count
    #----------------------------------------------------------------------------------------



class DecisionTree:
    """  Class schema for Decision Tree implementation

     Attributes:
        unique_labels (array): Array of unique labels in the datatset 
        root (Node): The root node of the tree (which contains an object reference to its children) 
        depth (int): Maximum path length from root to a leaf
        
    """

    def __init__(self, dataset, validation):
        """ Constructor 

            Args:
                dataset (numpy.ndarray): The datatset that is to be fit with a decision tree, with shape (N,8)
                validation(numpy.ndarray): The dataset that is to be used to evaluate pruning performance, with shape (N,8).
        """
        self.validation = validation
        self.unique_labels = self.get_labels(dataset)
        self.root, self.depth = self.decision_tree_learning(dataset, 0)
        
    
    def get_labels(self, dataset):
        """ Creates a list of the unique labels, and assigns to the unique_labels attribute

            Args:
                dataset (numpy.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
        """

        return np.unique(dataset[:,-1:])
    
    

    def predict(self, samples):
        """ Traverses the already built decision tree, in order to make a classification label for a set of samples, relying on predict_single()

            Args:
                sample (numpy.ndarray): A dataset of unseen samples samples to be classified, with shape (N,8) 

             Returns:
                labels (numpy.ndarray): Array containing label prediction of each sample, with shape (N,)
        """
        return np.asarray([self.predict_single(self.root, sample) for sample in samples])

    def predict_single(self, node, sample):
        """ Traverses the already built decision tree, in order to make a classification label for a single data sample, using recursion.

            Args:
                node (Node): Object reference to the current node in the decision tree.
                sample (numpy.ndarray): A single data sample (row) to be classified, with shape (1,8)

             Returns:
                label (int): The predicted label of our sample
        """

        # Base Case
        if node.is_leaf():
            return node.get_value()

        # Recursive case. Find the attribute we are splitting on and get the value
        node_attribute = node.get_attribute()
        node_value = node.get_value()
        sample_value = sample[node_attribute]
        
        # compare
        if sample_value <= node_value:
            return self.predict_single(node.get_left(), sample)
        else:
            return self.predict_single(node.get_right(), sample)


    def decision_tree_learning(self, dataset, depth):
        """ This function fits a binary decision tree to dataset, using recursion
        
        Args: 
            dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
            depth (int): Maximum path length from root to a leaf. Always initialised to 0 
        
        Returns:
            root (Node): The root node of the tree (which contains an object reference to its children)
            depth (int): Maximum path length from root to a leaf
        """
        # 1. Base Case: All dataset samples have the same label. 
        if self.have_same_label(dataset):  
            return self.create_leaf_node(dataset), depth
        

        # Stated assumption that there are no inconsistent data points i.e. identical feature vectors having different classes, thus no need for second base case.


        # 2. Inductive Case
        split_attribute, split_value = self.FIND_SPLIT(dataset)
        # print("Found split:  {} {}".format(split_attribute, split_value))
        
        # instantiate node
        new_node = Node()    
        new_node.set_value(split_value)
        new_node.set_attribute(split_attribute)
         
        left_dataset, right_dataset = self.split_dataset(dataset, split_attribute, split_value)

        # recursive step (depth-first fashion)
        l_branch, l_depth = self.decision_tree_learning(left_dataset, depth+1)
        r_branch, r_depth = self.decision_tree_learning(right_dataset, depth+1)

        new_node.set_left(l_branch)
        new_node.set_right(r_branch)
        depth = max(l_depth, r_depth)

        return new_node, depth

    
    def shared_label(self, dataset):
        """ Returns the label shared by every sample in dataset 
            
            *Function is called iff have_same_label() returns True

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)

            Returns:
                label (float): The label shared by all samples in dataset
        """

        label = dataset[0,-1]
        return label


    def have_same_label(self, dataset):
        """ Checks if all samples in dataset share the same label.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)

            Returns:
                shared (Boolean): True  = all samples in dataset share the same label
                                  False = variety of labels **(meaning it can be split further)

        """

        labels = dataset[:, -1:]
        shared = np.all(labels == labels[0])
        return shared


    def create_leaf_node(self, dataset):
        """ Creates a leaf node with `value` set as the label

            Args: 
                value (float): The label that is to be assigned to the node's `value` attribute 

            Returns:
                node (Node): A leaf node that is to be appended to the node path
        """
        value = self.shared_label(dataset)
        label_count = len(dataset)
        node = Node()
        node.set_leaf(True)
        node.set_value(value)
        node.set_label_count(label_count)
        return node


    def FIND_SPLIT(self, dataset):
        """ Chooses the attribute and the value that results in the highest information gain

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)

            Returns:
                Tuple:
                    attribute (int): The index of the optimal attribute to be split at
                    value: (float) :  The decision boundary of the attribute, which will split the incoming data
        """

        # (info_gain, attribute, split_value)
        max_info_gain = (0, 0, 0) 

        # -1 so we don't loop over labels
        for col in range(dataset.shape[1] - 1): 
            sorted_by_col = dataset[dataset[:, col].argsort()] # ascending order

            # loop over rows of specific attribute (column) being evaluated
            for row in range(sorted_by_col.shape[0] - 1): 

                # Determine information gain for a trialled split value
                split_value = (sorted_by_col[row,col] + sorted_by_col[row+1, col]) / 2
                left_branch, right_branch = self.split_dataset(dataset, col, split_value)
                info_gain = self.information_gain(dataset, left_branch, right_branch)

                # update maximal 
                if info_gain > max_info_gain[0]:
                    max_info_gain = (info_gain, col, split_value)


        # select splitting attribute + splitting value
        attribute = max_info_gain[1]
        value = max_info_gain[2]
        return (attribute, value)



    def entropy(self,dataset):
        """ Returns the entropy of a given dataset

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)

            Returns:
                entropy (float): The calculated entropy of the dataset 
        """
        sum = 0
        for label in self.unique_labels:
            proportion = self.p_label(dataset, label)
            log_of_proportion = 0 if proportion == 0 else np.log2(proportion)  # edge case
            sum += (proportion * log_of_proportion)

        return -sum

    def p_label(self, dataset, label):
        """ Returns the proportion of the dataset that contains labels that are equal to a given label parameter

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
                label (int): The label we are evaluating the proportion of

            Returns:
                entropy (float): The calculated entropy of the dataset 

        """
        # edge case: empty dataset
        return 0 if len(dataset) == 0 else sum([int(row[-1]) == int(label) for row in dataset]) / len(dataset)

    def information_gain(self, dataset, left, right):
        """ Returns the information gain from a given split of the dataset

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
                left (np.ndarray): The left split of dataset
                right (np.ndarray): The right split of dataset

            Returns:
                information gain (float): The calculated information gain

        """

        return self.entropy(dataset) - self.remainder(left,right)

    def remainder(self, left, right):
        """ Returns the entropy remaining after a given split. The information gain function subtracts this value 
            from the overall entropy, in order to calculate the information gained. 

            Args: 
                left (np.ndarray): The left split of the dataset
                right (np.ndarray): The right split of the dataset

            Returns:
                remainder (float): The calculated entropy remainder
                
        """
        
        # components
        size_left = left.shape[0]
        size_right = right.shape[0]
        entropy_left = self.entropy(left)
        entropy_right = self.entropy(right)

        return ((size_left / (size_left + size_right)) * entropy_left) + ((size_right / (size_left + size_right))* entropy_right)

    def split_dataset(self, dataset, attribute, split_value):
        """ Splits the dataset into left + right datasets, from a given split value on a specific attribute.

            Args: 
                dataset (np.ndarray): Data instances (first 7 columns) + data labels (8th column), with shape (N,8)
                attribute (int): The column index in dataset, of the specific attribute that will be split over.
                split_value (float): The value that each row of data will compared with. 
                                     When the value of the row indexed at attribute <= split value, that row is moved into the left branch.
                                     When the value of the row indexed at attribute > split value, that row is moved into the right branch
            Returns:
                left (np.ndarray): The left split of the dataset
                right (np.ndarray): The right split of the dataset
                
        """

        # implementational design to go left when equal to split_value
        left = dataset[((dataset[:,attribute] <= split_value))]
        right = dataset[((dataset[:,attribute] > split_value))] 

        return left, right



    def parents_of_leaves(self, node, parents):
        """ Generates a list of all nodes in tree that are parents of two leaves.

            Args: 
                node (Node): Object reference to the current node in the decision tree.
                parents (array): The list of parent nodes, initialised to an empty list

            The function does not return the parents list, it updates the parents list.
                
        """

        # 1. Base case
        if node.is_leaf():
            return

        # 2. Base Case
        if node.get_left().is_leaf() and node.get_right().is_leaf():
            parents.append(node) # parent found
            return
        
        # 3. Inductive Case (check children)
        self.parents_of_leaves(node.get_left(), parents) 
        self.parents_of_leaves(node.get_right(), parents)
        return 


    def majority(self, node):
        """ Returns the majority classification label of a sub-tree rooted at `node`.

            Args: 
                node (Node): Object reference to the current node in the decision tree.
                votes (array): An array of votes for each unique class label

            The function does not return the votes list, it updates the votes list.  
        """
        left = node.get_left()
        right = node.get_right()
        left_label_counts = left.get_label_count()
        right_label_counts = right.get_label_count()

        if left_label_counts >= right_label_counts:
            return left.get_value(), left
        else:
            return right.get_value(), right



    def evaluate(self, test_db, trained_tree):
        # We are inside decision tree, so we don't explicitly use trained_tree, but include to be consistent with the specification.
        y_predictions = self.predict(test_db)
        y_gold = test_db[:,-1]

        correct = sum([y_predictions[i] == y_gold[i] for i in range(len(test_db))])
        accuracy = correct / len(test_db)
        return accuracy


    def prune(self):
        """ Attempts to prune the already built decision tree (attempts because pruning may not improve performance, and will not be applied)

            This function takes the  and does not return anything. Rather it modifies the current decision tree object.
        """

        # Compute the error before pruning.
        prev_error = (1 - self.evaluate(self.validation, self))

        parents_of_leaves = []
        self.parents_of_leaves(self.root, parents_of_leaves)
        
        for parent in parents_of_leaves:

            m_label, m_node = self.majority(parent)

            # Backup value (in case prune needs to be undone)
            old_value = parent.get_value()

            # Prune tree 
            parent.set_leaf(True)
            parent.set_value(m_label)

            # Test performance of pruned tree
            new_error = (1 - self.evaluate(self.validation, self))

            #print("New error is: {}".format(new_error))
            #print("Old error is: {}".format(prev_error))
            #print("")
            if new_error <= prev_error:                      #Set back to <= to
                # success. Try pruning again.
                parent.set_label_count(m_node.get_label_count())
                self.prune()
                return
            else:
                # revert to previous tree
                parent.set_leaf(False)
                parent.set_value(old_value)
        return



In [32]:
# pre prune
decision_tree = DecisionTree(noisy_data_train, noisy_data_val)
print("Pre-pruning vaidation accuracy: {}".format(decision_tree.evaluate(noisy_data_val, decision_tree)))


Pre-pruning vaidation accuracy: 0.815


In [33]:
# post prune
decision_tree.prune()
print("Post-pruning vaidation accuracy: {}".format(decision_tree.evaluate(noisy_data_val, decision_tree)))

Post-pruning vaidation accuracy: 0.895


In [26]:
votes_per_label = np.zeros(5, dtype=int)
print(votes_per_label)

[0 0 0 0 0]


# Step 3: Evaluation

In [6]:


def k_fold_split(k_folds, n_instances, random_generator=default_rng()):
    """ Split k_instances into n mutually exclusive splits at random.
    
    Args:
        n_splits (int): Number of splits
        n_instances (int): Number of instances to split
        random_generator (np.random.Generator): A random generator

    Returns:
        list: a list (length n_splits). Each element in the list should contain a 
            numpy array giving the indices of the instances in that split.
    """

    # generate a random permutation of indices from 0 to n_instances
    shuffled_indices = random_generator.permutation(n_instances)

    # split shuffled indices into almost equal sized splits
    split_indices = np.array_split(shuffled_indices, k_folds)

    return split_indices


def train_test_k_fold(k_folds, n_instances, random_generator=default_rng()):
    """ Generate train and test indices at each fold.
    
    Args:
        k_folds (int): Number of folds
        n_instances (int): Total number of instances
        random_generator (np.random.Generator): A random generator

    Returns:
        list: a list of length k_folds. Each element in the list is a list (or tuple) 
            with two elements: a numpy array containing the train indices, and another 
            numpy array containing the test indices.
    """

    # split the dataset into k splits
    split_indices = k_fold_split(k_folds, n_instances, random_generator)

    folds = []
    for k in range(k_folds):
        test_indices = split_indices[k]
        val_indices = split_indices[(k + 1) % k_folds]
        train_indices = np.array([])
        for i in range(k_folds):
            if i != k and i!= (k + 1) % k_folds:
                train_indices = np.concatenate((train_indices, split_indices[i]))
        folds.append([np.array(train_indices, dtype=int), np.array(test_indices), np.array(val_indices)])

    return folds


def confusion_matrix(y_gold, y_prediction, class_labels=None):
    """ Compute the confusion matrix.
        
    Args:
        y_gold (np.ndarray): the correct ground truth/gold standard labels
        y_prediction (np.ndarray): the predicted labels
        class_labels (np.ndarray): a list of unique class labels. 
                               Defaults to the union of y_gold and y_prediction.

    Returns:
        np.array : shape (C, C), where C is the number of classes. 
                   Rows are ground truth per class, columns are predictions
    """

    # if no class_labels are given, we obtain the set of unique class labels from
    # the union of the ground truth annotation and the prediction
    if not class_labels:
        class_labels = np.unique(np.concatenate((y_gold, y_prediction)))

    confusion = np.zeros((len(class_labels), len(class_labels)), dtype=np.int)

    # for each correct class (row), 
    # compute how many instances are predicted for each class (columns)
    for (i, label) in enumerate(class_labels):
        # get predictions where the ground truth is the current class label
        indices = (y_gold == label)
        gold = y_gold[indices]
        predictions = y_prediction[indices]

        # quick way to get the counts per label
        (unique_labels, counts) = np.unique(predictions, return_counts=True)

        # convert the counts to a dictionary
        frequency_dict = dict(zip(unique_labels, counts))

        # fill up the confusion matrix for the current row
        for (j, class_label) in enumerate(class_labels):
            confusion[i, j] = frequency_dict.get(class_label, 0)

    return confusion


In [7]:
decision_tree = DecisionTree(noisy_data_train, noisy_data_val)
y_predictions = decision_tree.predict(noisy_data_test)
y_gold = noisy_data_test[:,-1]
confusion_matrix = confusion_matrix(y_gold, y_predictions)

print(confusion_matrix)


[[37  6  3  1]
 [ 2 41  4  3]
 [ 3  3 51  1]
 [ 4  0  2 39]]


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  confusion = np.zeros((len(class_labels), len(class_labels)), dtype=np.int)


In [34]:

def k_fold_evaluation(data, k_folds):
    accuracies_before = np.zeros((k_folds, ))
    accuracies_after = np.zeros((k_folds, ))

    for i, (train_indices, test_indices, val_indices) in enumerate(train_test_k_fold(k_folds, len(data), rg)):
        print(i)

        decision_tree = DecisionTree(data[train_indices], data[val_indices])
        acc = decision_tree.evaluate(data[test_indices], decision_tree)

        decision_tree.prune()
        acc_prune = decision_tree.evaluate(data[test_indices], decision_tree)

        accuracies_before[i] = acc
        accuracies_after[i] = acc_prune
        
    return (accuracies_before, accuracies_after)

k_fold_evaluation(noisy_data, 10)

0
1
2
3
4
5
6
7
8
9


(array([0.745, 0.795, 0.795, 0.79 , 0.775, 0.795, 0.8  , 0.795, 0.815,
        0.77 ]),
 array([0.885, 0.88 , 0.925, 0.86 , 0.9  , 0.895, 0.91 , 0.86 , 0.91 ,
        0.825]))

In [None]:
print(accuracies)

[0.825 0.775 0.81  0.805 0.82  0.83  0.84  0.82  0.845 0.77 ]


In [None]:
import numpy as np

arr = np.array([[1, 2, 3, 4, 5], [9, 2, 3, 4, 5], [7, 2, 3, 4, 5]])
mask = [row[0] < 9 for row in arr]
filtered = arr[mask]
arr[0][1] = 10
print(filtered)
print(arr)

[[1 2 3 4 5]
 [7 2 3 4 5]]
[[ 1 10  3  4  5]
 [ 9  2  3  4  5]
 [ 7  2  3  4  5]]


# Step 4: Pruning and re-evaluation

In [None]:
def parents_of_leaves(self, node, parents):

    if node.is_leaf():
        return

    if node.get_left().is_leaf() and node.get_right().is_leaf():
        parents.append(node)
        return

    self.parents_of_leaves(node.get_left())
    self.parents_of_leaves(node.get_right())
    return 


def majority(self):
    return np.argmax(self.majority_label(self.root, np.zeros(len(self.unique_labels) + 1)))

def majority_label(self, node, labels):

    if node.is_leaf():
        labels[node.get_value()]+=1
        return
    else:
        self.majority_label(node.get_left(), labels)
        self.majority_label(node.get_right(), labels)


def prune(self):
    parents_of_leaves = self.parents_of_leaves(self.root)
    for parent in parents_of_leaves():
        # Get majority label of child nodes
        m_label = self.majority()
        # Backup this node's old split value
        old_value = parent.get_value()
        # Make this node appear as a leaf and set majority label
        parent.set_leaf(True)
        parent.set_value(m_label)
        # Test whether making this node a leaf improves performance
        improved = self.improved_performance()
        if improved:
            self.prune()
        else:
            parent.set_leaf(False)
            parent.set(old_value)
    return
        
    

