In [4]:
import numpy as np

In [5]:
def read_dataset(filepath):
    """ Read in the dataset from the specified filepath

    Args:
        filepath (str): The filepath to the dataset file

    Returns:
        tuple: returns a tuple of (x, y, classes), each being a numpy array. 
               - x is a numpy array with shape (N, K), 
                   where N is the number of instances
                   K is the number of features/attributes
               - y is a numpy array with shape (N, ), and each element should be 
                   an integer from 0 to C-1 where C is the number of classes 
               - classes : a numpy array with shape (C, ), which contains the 
                   unique class labels corresponding to the integers in y
    """

    x = []
    y_labels = []
    for line in open(filepath):
        if line.strip() != "": # handle empty rows in file
            row = line.strip().split(",")
            x.append(list(map(float, row[:-1]))) 
            y_labels.append(row[-1])
    

    x = np.array(x, dtype=int)
    y = np.array(y_labels)
    return (x, y)

In [25]:
from numpy.random import default_rng

def split_dataset(x, y, test_proportion, random_generator=default_rng()):
    """ Split dataset into training and test sets, according to the given
        test set proportion.

    Args:
        x (np.ndarray): Instances, numpy array with shape (N,K)
        y (np.ndarray): Class labels, numpy array with shape (N,)
        test_proportion (float): the desired proportion of test examples
                                 (0.0-1.0)
        random_generator (np.random.Generator): A random generator

    Returns:
        tuple: returns a tuple of (x_train, x_test, y_train, y_test)
               - x_train (np.ndarray): Training instances shape (N_train, K)
               - x_test (np.ndarray): Test instances shape (N_test, K)
               - y_train (np.ndarray): Training labels, shape (N_train, )
               - y_test (np.ndarray): Test labels, shape (N_train, )
    """

    # TODO: Complete this function
    indices = np.arange(len(x))
    random_generator.shuffle(indices)
    x = x[indices]
    y = y[indices]

    x_row, x_col = x.shape
    partition = 1- int(x_row * test_proportion)
    x_train = x[:partition, :]
    x_test = x[partition:, :]
    y_train = y[:partition]
    y_test = y[partition:]
    print(x_test.shape)

    return (x_train, x_test, y_train, y_test)

In [6]:
(x_test, y_test) = read_dataset("data/test.txt")
print(y_test)

['Q' 'C' 'Q' 'Q' 'G' 'C' 'E' 'C' 'O' 'E' 'A' 'A' 'G' 'A' 'A' 'A' 'G' 'G'
 'E' 'E' 'E' 'E' 'O' 'C' 'Q' 'Q' 'Q' 'C' 'O' 'Q' 'G' 'Q' 'G' 'O' 'Q' 'Q'
 'G' 'O' 'Q' 'A' 'C' 'Q' 'O' 'Q' 'A' 'G' 'O' 'E' 'E' 'A' 'Q' 'G' 'A' 'C'
 'A' 'G' 'A' 'C' 'O' 'C' 'O' 'E' 'G' 'Q' 'O' 'E' 'C' 'A' 'A' 'G' 'A' 'C'
 'C' 'Q' 'O' 'O' 'O' 'A' 'E' 'C' 'A' 'Q' 'O' 'Q' 'C' 'A' 'A' 'Q' 'C' 'E'
 'A' 'A' 'C' 'Q' 'A' 'Q' 'O' 'Q' 'O' 'G' 'C' 'G' 'C' 'C' 'O' 'Q' 'A' 'Q'
 'Q' 'C' 'G' 'Q' 'C' 'Q' 'O' 'Q' 'O' 'G' 'Q' 'E' 'O' 'Q' 'E' 'Q' 'O' 'E'
 'O' 'C' 'O' 'A' 'G' 'Q' 'E' 'G' 'O' 'Q' 'E' 'O' 'C' 'O' 'G' 'G' 'Q' 'Q'
 'E' 'E' 'A' 'C' 'A' 'A' 'A' 'A' 'C' 'O' 'Q' 'G' 'O' 'O' 'E' 'C' 'C' 'Q'
 'Q' 'C' 'C' 'O' 'C' 'A' 'G' 'A' 'C' 'A' 'C' 'G' 'E' 'Q' 'E' 'C' 'O' 'A'
 'C' 'E' 'C' 'G' 'G' 'Q' 'G' 'Q' 'A' 'C' 'C' 'E' 'G' 'E' 'A' 'E' 'Q' 'O'
 'O' 'O']


In [7]:
(x_val, y_val) = read_dataset("data/validation.txt")

In [8]:
(x_full, y_full) = read_dataset("data/train_full.txt")

In [9]:
(x_toy, y_toy) = read_dataset("data/toy.txt")

In [10]:
 def confusion_matrix(y_gold, y_prediction, class_labels=None):
    
    # if no class_labels are given, we obtain the set of unique class labels from
    # the union of the ground truth annotation and the prediction
    if not class_labels:
        class_labels = np.unique(np.concatenate((y_gold, y_prediction)))

    confusion = np.zeros((len(class_labels), len(class_labels)), dtype=np.int64)

    # for each correct class (row), 
    # compute how many instances are predicted for each class (columns)
    for (i, label) in enumerate(class_labels):
        # get predictions where the ground truth is the current class label
        indices = (y_gold == label)
        gold = y_gold[indices]
        predictions = y_prediction[indices]

        # quick way to get the counts per label
        (unique_labels, counts) = np.unique(predictions, return_counts=True)

        # convert the counts to a dictionary
        frequency_dict = dict(zip(unique_labels, counts))

        # fill up the confusion matrix for the current row
        for (j, class_label) in enumerate(class_labels):
            confusion[i, j] = frequency_dict.get(class_label, 0)

    return confusion

In [11]:
 def recall(y_gold, y_prediction):

    confusion = confusion_matrix(y_gold, y_prediction)
    r = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[c, :]) > 0:
            r[c] = confusion[c, c] / np.sum(confusion[c, :])

    macro_r = 0.
    if len(r) > 0:
        macro_r = np.mean(r)
    return (r, macro_r)

In [12]:
def precision(y_gold, y_prediction):

    confusion = confusion_matrix(y_gold, y_prediction)
    p = np.zeros((len(confusion), ))
    for c in range(confusion.shape[0]):
        if np.sum(confusion[:, c]) > 0:
            p[c] = confusion[c, c] / np.sum(confusion[:, c])

    macro_p = 0.
    if len(p) > 0:
        macro_p = np.mean(p)
    
    return (p, macro_p)

In [13]:
def f1_score(y_gold, y_prediction):

    (precisions, macro_p) = precision(y_gold, y_prediction)
    (recalls, macro_r) = recall(y_gold, y_prediction)

    # just to make sure they are of the same length
    assert len(precisions) == len(recalls)

    f = np.zeros((len(precisions), ))
    for c, (p, r) in enumerate(zip(precisions, recalls)):
        if p + r > 0:
            f[c] = 2 * p * r / (p + r)

    macro_f = 0.
    if len(f) > 0:
        macro_f = np.mean(f)
    
    return (f, macro_f)

In [14]:
def accuracy(x, y):
    return np.sum(x==y)*100/len(x)

In [79]:
class Node:
    def __init__(self, split_val=None, column=None, label=None):
        self.left = self.right = None
        self.split_val = split_val
        self.column = column
        self.label = label
        
    def add_child(self, child): 
        if self.left == None:
            self.left = child
        elif self.right == None:
            self.right = child
        else:
            print("no children node free")
            exit()

    def is_leaf(self):
        if self.label != None:
            return True
        return False

    def max_depth(self, d=0):

        if self.left == None and self.right == None:
            return 0
        
        left_depth = self.left.max_depth(d) + 1
        right_depth = self.right.max_depth(d) + 1

        if right_depth >= left_depth:
            return right_depth
        else:
            return left_depth


In [119]:
class DecisionTreeClassifier(object):
    """ Basic decision tree classifier
    
    Attributes:
    is_trained (bool): Keeps track of whether the classifier has been trained
    
    Methods:
    fit(x, y): Constructs a decision tree from data X and label y
    predict(x): Predicts the class label of samples X
    prune(x_val, y_val): Post-prunes the decision tree
    """

    def __init__(self, num_layers_to_prune=None, min_samples_split=2, min_samples_leaf=1):
        self.is_trained = False
        self.root = None
        self.num_layers_to_prune = num_layers_to_prune
        self.max_depth = 0
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf

    
    def calculate_entropy(self, y):
        values, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        entropy = -np.sum(probabilities * np.log2(probabilities))
        return entropy

    
    def calculate_info_gain(self, x, y, x_val, sort_col):

        # Calculate total entropy for overall data
        total_entropy = self.calculate_entropy(y)

        # Calculate entropy for left and right of split
        left_entropy = self.calculate_entropy(y[x[:, sort_col] < x_val])
        right_entropy = self.calculate_entropy(y[x[:, sort_col] >= x_val])

        # Calculate info gain
        info_gain = total_entropy - ((len(y[x[:, sort_col] < x_val])/len(y) * left_entropy) 
                                    + (len(y[x[:, sort_col] >= x_val])/len(y) * right_entropy))
        
        return info_gain

        
    def find_best_node(self, x, y):
        y = y.reshape(-1, 1)

        # To keep track of info gain
        max_gain = value_to_split_on = column_to_split_on = None
                
        # loop through each column 
        for i in range(x.shape[1]):
            # sort by that column
            index_list = x[:, i].argsort()
            x = x[index_list]
            y = y[index_list]

            starting_label = y[0]
            starting_val = x[:, i][0]

            # loop through the column
            for x_val, y_val in zip(x[:, i], y):
                if (y_val != starting_label) and (x_val != starting_val):

                    # calculate information gain
                    info_gain = self.calculate_info_gain(x, y, x_val, i)
                    
                    # update the max information gain
                    if max_gain is None or info_gain > max_gain:
                        max_gain = info_gain
                        value_to_split_on = x_val
                        column_to_split_on = i

                    # Update starting label
                    starting_label = y_val
                    starting_val = x_val
                    
        return Node(value_to_split_on, column_to_split_on)


    def split_dataset(self, x, y, node):
        # Simplified dataset splitting
        left_mask = x[:, node.column] < node.split_val
        right_mask = ~left_mask  # Inverse of left_mask
        return (x[left_mask], y[left_mask]), (x[right_mask], y[right_mask])


    
    def induce_decision_tree(self, x, y, depth=0):
        if len(np.unique(y)) <= 1 or x.shape[0] < self.min_samples_split or depth == self.max_depth:
            label_set, count = np.unique(y, return_counts=True)
            leaf_node = Node(label=label_set[np.argmax(count)])
            return leaf_node
        else:
            parent = self.find_best_node(x, y)
            if parent.split_val is None or x.shape[0] <= self.min_samples_leaf:
                label_set, count = np.unique(y, return_counts=True)
                leaf_node = Node(label=label_set[np.argmax(count)])
                return leaf_node
            
            (x_left, y_left), (x_right, y_right) = self.split_dataset(x, y, parent)
            
            if len(y_left) >= self.min_samples_leaf and len(y_right) >= self.min_samples_leaf:
                parent.left = self.induce_decision_tree(x_left, y_left, depth + 1)
                parent.right = self.induce_decision_tree(x_right, y_right, depth + 1)
            else:
                label_set, count = np.unique(y, return_counts=True)
                return Node(label=label_set[np.argmax(count)])
            
            return parent


    def fit(self, x, y):

        
        # Make sure that x and y have the same number of instances
        assert x.shape[0] == len(y), \
            "Training failed. x and y must have the same number of instances."
        
        #######################################################################
        #                 ** TASK 2.1: COMPLETE THIS METHOD **
        #######################################################################
        self.max_depth = np.inf
        self.root = self.induce_decision_tree(x, y)
        self.max_depth = self.get_max_depth_from_tree() - self.num_layers_to_prune

        self.root = self.induce_decision_tree(x, y)
        
        # set a flag so that we know that the classifier has been trained
        self.is_trained = True

    
    def classify_instance(self, instance, node):

        if node.label != None:
            return node.label

        if instance[node.column] >= node.split_val:
            return self.classify_instance(instance, node.right)
        
        return self.classify_instance(instance, node.left)
        
    
    def predict(self, x):

        
        # make sure that the classifier has been trained before predicting
        if not self.is_trained:
            raise Exception("DecisionTreeClassifier has not yet been trained.")
        
        # set up an empty (M, ) numpy array to store the predicted labels 
        # feel free to change this if needed
        predictions = np.zeros((x.shape[0],), dtype=object)
        #######################################################################
        #                 ** TASK 2.2: COMPLETE THIS METHOD **
        #######################################################################
        for i in range(len(x)):
            label = self.classify_instance(x[i], self.root)
            predictions[i] = label
    
        return predictions

    def get_max_depth_from_tree(self):
        return self.root.max_depth()

In [120]:
t = DecisionTreeClassifier(8);
t.fit(x_full, y_full)

In [121]:
print(t.get_max_depth_from_tree())

predict = t.predict(x_val)

print(accuracy(predict, y_val))

13
93.0


In [124]:
class RandomForestClassifier(object):

    def __init__(self, treesNum, num_layers_to_prune=0, min_samples_split=2, min_samples_leaf=1, feature_proportion=1):
        self.is_trained = False
        self.roots = []
        self.treesNum = treesNum
        self.prune_layer_num = num_layers_to_prune
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.feature_proportion = feature_proportion

    def fit(self, x, y):

        features_num = int(x.shape[1] * self.feature_proportion)
        
        # for treesNum
        for i in range(self.treesNum):
            # bagged samples from x
            bagged_ints = np.random.randint(0, len(x), len(x), dtype=int)
            x_bagged = x[bagged_ints, :]
            y_bagged = y[bagged_ints]

            features = np.arange(0,x.shape[1])
            np.random.shuffle(features)
            feature_list = features[:features_num]
            x_bagged = x_bagged[:, feature_list]
            
            # train a tree on this subset
            tree = DecisionTreeClassifier(self.prune_layer_num, self.min_samples_split,  self.min_samples_leaf)
            tree.fit(x_bagged, y_bagged)
            
            # add to roots
            self.roots.append((tree, feature_list))

    def predict(self, x):

        predictions = np.zeros((self.treesNum, len(x)), dtype='str')
        # for each tree in roots
        for i, (tree, feature_list) in enumerate(self.roots):
            predictions[i] = tree.predict(x[:, feature_list])
            
        # then find mode of predictions
        mode, count = np.unique(predictions, return_counts=True, axis=0)
        modePredictions = mode[np.argmax(count)]
        return modePredictions
       

In [133]:
test = RandomForestClassifier(70, 8, 2, 1, 0.4);
test.fit(x_full, y_full)

In [134]:
thirty_predict = test.predict(x_val)
print(thirty_predict)

['A' 'A' 'C' 'Q' 'Q' 'C' 'G' 'Q' 'G' 'O' 'G' 'O' 'C' 'O' 'A' 'O' 'C' 'G'
 'C' 'A' 'A' 'C' 'A' 'E' 'O' 'G' 'A' 'C' 'E' 'E' 'O' 'A' 'C' 'G' 'E' 'Q'
 'O' 'G' 'A' 'O' 'Q' 'Q' 'O' 'G' 'C' 'E' 'G' 'O' 'C' 'E' 'G' 'O' 'G' 'Q'
 'O' 'C' 'C' 'A' 'O' 'E' 'E' 'E' 'O' 'Q' 'Q' 'O' 'G' 'E' 'O' 'G' 'A' 'Q'
 'Q' 'E' 'C' 'E' 'G' 'E' 'C' 'O' 'O' 'C' 'Q' 'A' 'A' 'C' 'O' 'A' 'A' 'A'
 'Q' 'G' 'Q' 'O' 'A' 'A' 'A' 'A' 'O' 'C']


In [135]:
confusion_thirty = confusion_matrix(y_val, thirty_predict)
print(confusion_thirty)

[[19  0  0  0  0  0]
 [ 0 13  0  0  0  0]
 [ 0  0 11  4  1  2]
 [ 1  2  1  9  0  1]
 [ 0  1  1  0 17  1]
 [ 0  1  0  2  3 10]]


In [136]:
print(accuracy(thirty_predict, y_val))

79.0


In [137]:
test_predict = test.predict(x_test)
print(test_predict)

['C' 'C' 'Q' 'Q' 'G' 'O' 'C' 'C' 'C' 'E' 'A' 'A' 'G' 'A' 'A' 'A' 'G' 'Q'
 'E' 'E' 'E' 'E' 'O' 'C' 'Q' 'Q' 'Q' 'C' 'C' 'O' 'E' 'Q' 'G' 'O' 'Q' 'G'
 'C' 'O' 'Q' 'A' 'C' 'G' 'O' 'Q' 'A' 'E' 'Q' 'E' 'E' 'Q' 'Q' 'Q' 'O' 'C'
 'A' 'G' 'G' 'C' 'G' 'C' 'O' 'E' 'G' 'Q' 'O' 'E' 'G' 'A' 'A' 'O' 'A' 'C'
 'Q' 'E' 'G' 'O' 'O' 'A' 'E' 'C' 'A' 'Q' 'C' 'E' 'C' 'A' 'A' 'Q' 'C' 'E'
 'A' 'A' 'C' 'Q' 'A' 'Q' 'O' 'Q' 'O' 'Q' 'C' 'G' 'C' 'C' 'O' 'Q' 'A' 'Q'
 'Q' 'C' 'E' 'Q' 'C' 'Q' 'Q' 'Q' 'O' 'G' 'Q' 'E' 'O' 'Q' 'E' 'Q' 'Q' 'E'
 'O' 'C' 'O' 'A' 'Q' 'Q' 'E' 'G' 'G' 'G' 'E' 'C' 'C' 'O' 'G' 'G' 'Q' 'Q'
 'E' 'E' 'A' 'C' 'A' 'G' 'O' 'A' 'C' 'O' 'Q' 'G' 'O' 'O' 'E' 'C' 'C' 'Q'
 'Q' 'C' 'C' 'O' 'C' 'A' 'Q' 'A' 'C' 'A' 'C' 'G' 'E' 'Q' 'E' 'C' 'O' 'A'
 'C' 'E' 'G' 'O' 'G' 'Q' 'G' 'Q' 'Q' 'C' 'C' 'G' 'G' 'E' 'A' 'G' 'Q' 'O'
 'O' 'O']


In [138]:
print(accuracy(test_predict, y_test))

79.5


In [81]:
predictions = np.zeros((10, len(x_val)), dtype='str')

for i in range(1,101, 10):
    test = RandomForestClassifier(i)
    test.fit(x_full, y_full)

    predictions[i//10] = (test.predict(x_val))
    
    

In [82]:
for i, list in enumerate(predictions):
    print(f"{i}: {accuracy(list, y_val)}")

0: 88.0
1: 82.0
2: 85.0
3: 86.0
4: 84.0
5: 82.0
6: 91.0
7: 85.0
8: 92.0
9: 89.0


In [140]:
# save each configuration
    # 6x? array
    # 6th index = forest

# ranges of configurations
    # num of trees
    # pruning layers (0-10)
    # min split sample size (2-7)
    # min leaf sample size (1-6)
    # proportions of features (0.3 - 0.9, steps of 0.05)

# create array for configs
configurations = []

# num of trees
for i in range(1,101,10):

    # pruning range
    for j in range(0,10):
    
        # min split range
        for k in range(2,7):
    
            # min leaf range
            for l in range(1,6):
    
                # feature range
                for m in np.arange(0.3, 0.9, 0.05):
    
                    # create random forest
                    tempForest = RandomForestClassifier(i, j, k, l, m)
                    tempForest.fit(x_full, y_full)
                    
                    # save to list
                    config = (i, j, k, l, m, tempForest)
                    configurations.append(config)
                    


KeyboardInterrupt: 