### (4) Decision Tree Classification

In [1]:

from collections import Counter
from sklearn.impute import KNNImputer

import numpy as np
import pandas as pd
from numpy import genfromtxt
import scipy.io
from sklearn.model_selection import cross_val_score
import io

import random
random.seed(246810)
np.random.seed(246810)


def resultsToCsv(y_test, dataset):
    y_test = y_test.astype(int)
    df = pd.DataFrame({'Category': y_test})
    df.index += 1 # Ensures that the index starts at 1
    df.to_csv(f"{dataset}_predictions.csv", index_label='Id')

class DecisionTree:

    def __init__(self, max_depth=None, feature_labels=None, class_names=None, max_features=False, random_state = None):
        # model hyperparameters
        self.max_depth = max_depth
        self.features = feature_labels
        self.class_names = class_names
        self.max_features = max_features
        self.random_state = random_state

        if random_state is not None:
            self.rs = np.random.RandomState(seed=self.random_state)
        else:
            self.rs = np.random.RandomState()

        # model data and variables
        self.data, self.pred = None, None
        self.root = None
        self.y = None

    class Node:
        def __init__(self):
            self.split_rule = None
            self.right = None
            self.left = None
            self.label = None

    def entropy(self, y):
        # check for division 0
        if len(y) == 0:
            return 0
        # calculate posterior probability 
        p_c =  np.sum(y) / len(y)
        # if posterior probability is 0 or 1 return 0 entropy
        if p_c in [0, 1]:
            return 0
        p_nc = 1  - p_c
        return -(p_c * np.log2(p_c) + p_nc * np.log2(p_nc))

    def split(self, X, y, idx, thresh):
        """
        Calls split_test to perform split on the feature  and threshold
            i.e. split_test(self, X, pclass=1, 1.5)
        X: feature matrix
        y: prediction labels
        idx: the index of the feature {0,1,2....14} for titanic
        thresh: threshold to use for split
        """
        X0, idx0, X1, idx1 = self.split_test(X, idx=idx, thresh=thresh)
        y0, y1 = y[idx0], y[idx1]

        return X0, y0, X1, y1

    def split_test(self, X, idx, thresh):
        """Returns two substers of X based on feature and threshold
        and the indices of the samples idx0, idx1,
        It then uses these indices to split the label vector y into two parts (y0, y1)
        It then returns the split matrices and their indices
        """
        idx0 = np.where(X[:, idx] <= thresh)[0]
        idx1 = np.where(X[:, idx] > thresh)[0]
        X0, X1 = X[idx0, :], X[idx1, :]
        return X0, idx0, X1, idx1
    
    def build_tree(self, X, y, j):

        # some variables
        entropy_before = self.entropy(y)
        best_info_gain = 0
        best_feature = None
        best_threshold = None

        # Change this so we split on random features
        if self.max_features == False:
            indexes = np.arange(X.shape[1])
        elif self.max_features == True:
            indexes = self.rs.choice(range(X.shape[1]), size=M, replace=False)
        
        # iterate through the columns
        #  [b'pclass', b'age', b'sibsp', b'parch', b'fare', b'male', b'female', b'S', b'C', b'Q']
        for i in indexes:
            # initial thresholds are unique categores
            thresholds = np.unique(X[:,i])
            # if thresholds are continous then use percentile
            if thresholds.shape[0] > 12:
                thresholds = [np.quantile(X[:,i], .55)]
            # iterate through the thresholds (only one if a percentile)
            for t in thresholds:
                # make split at the feature and threshold
                x0, y0, x1, y1 = self.split(X,y, i, t)
                # 
                if len(y0) > 0 and len(y1) > 0:
                    posterior_x0, posterior_x1 = (len(y0) / len(y)), (len(y1) / len(y))
                    entropy_after = posterior_x0 * self.entropy(y0)  + posterior_x1 * self.entropy(y1)
                    info_gain = entropy_before - entropy_after
                    if info_gain > best_info_gain:
                        best_info_gain = info_gain
                        best_feature = i
                        best_threshold = t
                        best_x0, best_x1, best_y0, best_y1 = x0, x1, y0, y1


        if best_info_gain <= eps or j == self.max_depth:
            n = self.Node()
            n.label = np.bincount(y).argmax()
            return n
        else:
            node = self.Node()
            node.split_rule = (best_feature, best_threshold)
            node.left = self.build_tree(best_x0,best_y0, j+1)
            node.right = self.build_tree(best_x1,best_y1, j+1)
            return node

    def __repr__(self):
        def recurse(node, depth):
            if node is None:
                return " " * depth + "None\n"
                
            if node.label is not None:
                return " " * depth + f"Predict: {node.label}\n"
            
            feature_name = node.split_rule[0]
            threshold = node.split_rule[1]
            left_repr = recurse(node.left, depth + 1) if node.left else " " * (depth + 1) + "None\n"
            right_repr = recurse(node.right, depth + 1) if node.right else " " * (depth + 1) + "None\n"
            
            node_repr = ("   " * depth + f"If {feature_name} <= {threshold}:\n" + left_repr +
                 " " * depth + f"Else {feature_name} > {threshold}:\n" + right_repr)
            
            return node_repr

        if self.root == None:
            return "empty tree"
        else:
            return recurse(self.root, 0)

    def viz(self):
        with open('custom_tree.dot', 'w') as f:
            self.export_tree(self.root, self.class_names,f)


    def export_tree(self, node, class_names, dot_file):
        def recurse(node, parent_id, next_node_id):
            if node is None:
                return next_node_id
        
            node_id = next_node_id
            
            if node.label is not None:
                label = class_names[node.label]
                dot_file.write(f'{node_id} [label="{label}", shape="box"];\n')
            else:
                feature_name = self.features[node.split_rule[0]]
                label = f'{feature_name} <= {node.split_rule[1]}'
                dot_file.write(f'{node_id} [label="{label}"];\n')

            next_node_id += 1

            if node.left is not None:
                dot_file.write(f'{node_id} -> {next_node_id} [label="true"];\n')
                next_node_id = recurse(node.left, node_id, next_node_id)
            if node.right is not None:
                dot_file.write(f'{node_id} -> {next_node_id} [label="false"];\n')
                next_node_id = recurse(node.right, node_id, next_node_id)
        
            return next_node_id
        
        dot_file.write('digraph G {\n')
        dot_file.write('node [shape=ellipse];\n')
        recurse(node, None, 0)
        dot_file.write('}\n')
    
    def fit(self, X, y):
        self.data = X
        self.y = y
        j = 0
        # build tree and initialize the root with the entire tree
        self.root = self.build_tree(self.data, self.y, j)
        # generate a dot file of the trained tree. To display call the command in first cell block

    def predict(self, X):
        '''
        Takes a test set and returns the prediction of each row
        recursively by calling recurse on the root node, and a row
        This gets a label / prediction or calls recurse again.
        '''
        def rec(n, row):
            '''
            takes the root of the tree and a row
            gows dow the tree and compares the value of the row
            at the split index and and splits accordingly
            returns the prediction for that row
            '''   
            if n.label is not None:
                return n.label
            else:
                idx, threshold = n.split_rule
                #print(f"{self.features[idx]}")
                if row[idx] <= threshold:
                    return rec(n.left, row)
                if row[idx] > threshold:
                    return rec(n.right, row)
                
        # to store predictions
        predictions = []

        if self.root == None:
            return "!ERROR IN PREDICT!"
        
        # make predicitons for each row recursively
        for i in range(X.shape[0]):
            predictions.append(rec(self.root, X[i]))

        # return the prediction made
        return predictions

class BaggedTrees():
    def __init__(self, params=None, n=None):
        if params is None:
            params = {}
        self.params = params
        self.n = n
        self.decision_trees = [DecisionTree(**self.params, random_state=i) for i in range(self.n)]
        
        # for each tree in decision_tree
class RandomForest(BaggedTrees):
    """
    """
    def __init__(self, params=None, n=300, m=True):
        if params is None:
            params = {}
        params['max_features'] = m
        # number of trees in forest
        self.n = n 
        super().__init__(params=params, n=self.n) # call the BaggedTrees constructor

    def fit(self, X, y):
        """
        Takes the training data and uses the decision trees defined by the construcor
        to generate random sample points and train each decision tree
        """

        # for each tree the the constructor defined 
        for t in self.decision_trees:
            # using the random object inside each tree generate sample points and labels
            rand_indices = t.rs.choice(range(X.shape[0]),size=X.shape[0], replace=False)
            random_points = X[rand_indices]
            random_labels = y[rand_indices]
            # fit the tree with the random points and labels
            t.fit(random_points, random_labels)

    def predict(self, X):
        # Get the first trees predictions
        predictions = np.array(self.decision_trees[0].predict(X))[:, np.newaxis]

        for i in range(1, len(self.decision_trees)):
            current_trees_predictions = np.array(self.decision_trees[i].predict(X))[:, np.newaxis]
            predictions = np.hstack((predictions, current_trees_predictions))
        
        final_predictions = scipy.stats.mode(predictions, axis=1)[0].flatten()
        return final_predictions

def preprocess(data, fill_mode=True, min_freq=10, onehot_cols=[]):
    # fill_mode = False
    # Temporarily assign -1 to missing data
    data[data == b''] = '-1'

    # Hash the columns (used for handling strings)
    onehot_encoding = []
    onehot_features = []
    for col in onehot_cols:
        counter = Counter(data[:, col])
        for term in counter.most_common():
            if term[0] == b'-1':
                continue
            if term[-1] <= min_freq:
                break
            onehot_features.append(term[0])
            onehot_encoding.append((data[:, col] == term[0]).astype(float))
        data[:, col] = '0'
    onehot_encoding = np.array(onehot_encoding).T

    data = np.hstack(
        [np.array(data, dtype=float),
         np.array(onehot_encoding)])

    return data, onehot_features

def evaluate(clf):
    print("Cross validation", cross_val_score(clf, X, y))
    if hasattr(clf, "decision_trees"):
        counter = Counter([t.tree_.feature[0] for t in clf.decision_trees])
        first_splits = [
            (features[term[0]], term[1]) for term in counter.most_common()
        ]
        print("First splits", first_splits)

if __name__ == "__main__":
    # pick dataset
    #dataset = "titanic"
    dataset = "spam"
    
    if dataset == "titanic":
        # Load titanic data
        path_train = 'datasets/titanic/titanic_training.csv'
        data = genfromtxt(path_train, delimiter=',', dtype=None)
        path_test = 'datasets/titanic/titanic_testing_data.csv'
        test_data = genfromtxt(path_test, delimiter=',', dtype=None)
        y = data[1:, 0]  # label = survived
        class_names = ["Died", "Survived"]

        labeled_idx = np.where(y != b'')[0]
        y = np.array(y[labeled_idx], dtype=float).astype(int)
        print("\n\nPart (b): preprocessing the titanic dataset")
        X, onehot_features = preprocess(data[1:, 1:], onehot_cols=[1, 5, 7, 8])
        X = X[labeled_idx, :]
        Z, _ = preprocess(test_data[1:, :], onehot_cols=[1, 5, 7, 8])
        assert X.shape[1] == Z.shape[1]
        features = list(data[0, 1:]) + onehot_features

        # Replace -1 wiw nan
        X[X == -1] = np.nan
        Z[Z == -1] = np.nan

        # model parameters
        eps = 1e-2 # determines information gain stopping criterion 
        M = 11 # determines how many random features to use in Random Forest taking into account that 4 of the columns are 0
        w = 'uniform'   # KNN parameter
        
        # impute training data
        imputer = KNNImputer(n_neighbors=5, weights = w)
        X_imputed = imputer.fit_transform(X)

        # impute test data
        test_imputer = KNNImputer(n_neighbors=5, weights=w)
        Z_imputed = test_imputer.fit_transform(Z)

        # Split data 
        shuffled_indices = np.random.permutation(len(X_imputed))
        split_index = int(len(X_imputed) * 0.80)
        X_train = X_imputed[shuffled_indices][:split_index]
        Y_train = y[shuffled_indices][:split_index].astype(int)
        X_valid = X_imputed[shuffled_indices][split_index:]
        Y_valid = y[shuffled_indices][split_index:].astype(int)

        # Random Forest Parameters
        params = {
            "max_depth": 25,
            "feature_labels": features,
            "class_names": class_names,
            }
        
        # Decision Tree
        myTree = DecisionTree(max_depth=25,feature_labels=features, class_names=class_names)
        myTree.fit(X_train, Y_train)
        my_classifiers_predictions = myTree.predict(X_valid)
        accuracy = np.sum(my_classifiers_predictions == Y_valid) / len(Y_valid)
        print(f"Tree Classifier Accuracy for {dataset}:  {accuracy}")
        test_predictions = myTree.predict(Z_imputed)
        # export predictions
        #resultsToCsv(np.array(test_predictions), dataset)
        myTree.viz()
        
        # Random Forest
        random_forest = RandomForest(params)
        random_forest.fit(X_train, Y_train)
        random_forest_validation_predictions = random_forest.predict(X_valid)
        print("Random Forest Accuracy: ", sum(random_forest_validation_predictions == Y_valid)  / len(Y_valid))
        random_forest_test_predictions = random_forest.predict(Z_imputed)
        resultsToCsv(random_forest_test_predictions, f"{dataset}_random_forest")

    elif dataset == "spam":
        
        features = [
            "pain", "private", "bank", "money", "drug", "spam", "prescription",
            "creative", "height", "featured", "differ", "width", "other",
            "energy", "business", "message", "volumes", "revision", "path",
            "meter", "memo", "planning", "pleased", "record", "out",
            "semicolon", "dollar", "sharp", "exclamation", "parenthesis",
            "square_bracket", "ampersand"
        ]
        assert len(features) == 32

        eps = 1e-3 # determines information gain stopping criterion 
        M = 7 # determines how many random features to use in Random Forest taking into account that 4 of the columns are 0

        # Load spam data
        path_train = 'datasets/spam_data/spam_data.mat'
        data = scipy.io.loadmat(path_train)
        X = data['training_data']
        y = np.squeeze(data['training_labels'])
        Z = data['test_data']
        class_names = ["Ham", "Spam"]
        params = {
        "max_depth": 15,
        "feature_labels": features,
        "class_names": class_names
        }
        # shuffle the training data 
        shuffled_indices = np.random.permutation(len(X))
        split_index = int(len(X) * 0.80)


        # split the training data into a train and validation set 
        X_train = X[shuffled_indices][:split_index]
        Y_train = y[shuffled_indices][:split_index].astype(int)
        X_valid = X[shuffled_indices][split_index:]
        Y_valid = y[shuffled_indices][split_index:].astype(int)

        # Decision Tree
        myTree = DecisionTree(max_depth=15,feature_labels=features, class_names=class_names)
        myTree.fit(X_train, Y_train)
        my_classifiers_predictions = myTree.predict(X_valid)
        accuracy = np.sum(my_classifiers_predictions == Y_valid) / len(Y_valid)
        print(f"Tree Classifier Accuracy for {dataset}:  {accuracy}")
        test_predictions = myTree.predict(Z)
        
        # export predictions
        resultsToCsv(np.array(test_predictions), dataset)

        # Random Forest
        random_forest = RandomForest(params)
        random_forest.fit(X_train, Y_train)
        random_forest_validation_predictions = random_forest.predict(X_valid)
        randomForestAccuracy = sum(random_forest_validation_predictions == Y_valid)  / len(Y_valid)
        print(f"Random Forest Accuracy for {dataset}: ", randomForestAccuracy)
        random_forest_test_predictions = random_forest.predict(Z)

        resultsToCsv(random_forest_test_predictions, f"{dataset}_random_forest")
    else:
        raise NotImplementedError("Dataset %s not handled" % dataset)


Tree Classifier Accuracy for spam:  0.8268206039076377
Random Forest Accuracy for spam:  0.8348134991119005
