# 07_Decision_Tree

In [1]:
import numpy as np
from sklearn import datasets
import warnings
warnings.filterwarnings("ignore")

In [2]:
iris = datasets.load_iris()
data = iris.data

In [3]:
# Note: Parts of the Code are based on: https://medium.com/@cjakuc/building-a-decision-tree-classifier-c00a08815c3

### Implement Decision Tree

In [4]:
class NodeManager:
    """
    NodeManager class is used to build the decision tree by storing
    the following parameters:
    class_prediction: prediction label
    feature_idx: idx of feature for optimal split
    threshold: threshold for optimal split
    left: node below threshold
    right: node above threshold
    leftbranch/rightbranch: indicate which branch node is from parent
    """
    def __init__(self, class_prediction, depth=None):
        self.class_prediction = class_prediction
        # feature index will be used to identify optimal split column
        self.feature_idx = 0
        # threshold will be used to store optimal split threshold
        self.threshold = 0
        # below threshold
        self.left = None
        # above threshold
        self.right = None
        # left to parent
        self.leftbranch = False
        # right to parent
        self.rightbranch = False
        self.depth = depth


class DecisionTree:
    """
    Class to build a decision tree with maximum depth maximum_depth.
    Can be used with datasets with a variable number of features.
    """
    def __init__(self, maximum_depth=None):
        self.maximum_depth = maximum_depth
        self.tree = None

    def fit(self, X_train, y_train):
        """
        Fit the decision tree to a training set.
        """
        # identify number of classes
        self.n_classes = len(np.unique(y_train))
        # build decision tree classifier with build_tree method
        self.tree = self.build_tree(X_train, y_train)
        #print('...decision tree fitted.')

    def next_split(self, data, labels):
        """
        Takes data and labels of a node as input and returns column
        and threshold for optimal next split into child nodes with min
        impurity. If data contains no samples or minimal
        parent impurity was smaller than new minimal 
        impurity, it returns None,None.
        """
        # Check if the subset contains at least one observation
        n_data_points = labels.size
        # If not, return None,None for col,threshold
        if n_data_points <= 1:
            return None, None
        
        # reshape labels
        labels = labels.reshape(n_data_points,)
        # count classes in parent node
        count_in_parent = [np.count_nonzero(labels == c) for c in range(self.n_classes)]

        # Get the minimum impurity of the parent node to compare with new min impurity 
        parent_impurity = 1.0 - sum((n / n_data_points) ** 2 for n in count_in_parent)
        
        # combine data and labels to facilitate sorting
        dataset = np.column_stack((data,labels))
        # Initialize empty G_list
        G_list = []

        # iteraete over columns (features)
        for col in range(0, dataset.shape[1]-1):
            # sort by respective column
            dataset = dataset[dataset[:, col].argsort()]
            
            # empty list to store results for every column
            G_col_list = []

            # iterate over elements in col
            for i in range(0,len(dataset)-1):
                # compute threshold as mean of two elements
                threshold = (dataset[i,col] + dataset[i+1,col])/2
                # number of values equal above threshold
                n1 = (dataset[:,col] >= threshold).sum()
                # number of values below threshold
                n2 = (dataset[:,col] < threshold).sum()
                # total number of elements
                n_all = n1+n2
                # number of elements in node 1 which belong to class 0
                l1_class0 = (dataset[dataset[:,col]>=threshold][:,-1] == 0).sum()
                # number of elements in node 1 which belong to class 1
                l1_class1 = (dataset[dataset[:,col]>=threshold][:,-1] == 1).sum()
                # gini impurity of node 1
                if n1 != 0:
                    l1_G = 1 - (l1_class0/n1)**2 - (l1_class1/n1)**2
                else: 
                    l1_G = 0
                # number of elements in node 2 which belong to class 0
                l2_class0 = (dataset[dataset[:,col]<threshold][:,-1] == 0).sum()
                # number of elements in node 2 which belong to class 1
                l2_class1 = (dataset[dataset[:,col]<threshold][:,-1] == 1).sum()
                # gini impurity of node 2
                if n2 != 0:
                    l2_G = 1 - (l2_class0/n2)**2 - (l2_class1/n2)**2
                else:
                    l2_G = 0
                # gini impurity of whole node (weightes impurities of node 1 and node 2)
                G = n1/n_all*l1_G + n2/n_all*l2_G
                # store gini impority and threshold
                result = np.array([threshold, G])
                G_col_list.append(result)

            # identify and store smallest gini impurity
            G_col_arr = np.array(G_col_list)
            idx_smallest_G = np.array(G_col_list)[:,1].argmin()
            G_list.append(G_col_arr[idx_smallest_G])

        G_arr = np.array(G_list)
        # The column with the smallest impurity
        optimal_col = G_arr[:,-1].argmin()
        # min gini impurity
        min_impurity = G_arr[optimal_col, -1]
        # The respective threshold of the column with the smallest impurity
        optimal_threshold = G_arr[optimal_col][0]
        
        # check if min_impurity is larger than parent impurity (no improvement):
        # If this is the case, return None, None for col and threshold
        if min_impurity >= parent_impurity:
            return None,None

        # If impurity improved, return the optimal column and the optimal threshold
        return optimal_col, optimal_threshold

    def build_tree(self, X, y, depth=0):
        """
        Build the decision tree until the maximum depth is reached.
        """

        # count the occurance of each class
        class_occurrence = [np.count_nonzero(y == i) for i in range(self.n_classes)]
        # class prediction is the class with the largest occurence
        class_prediction = np.argmax(class_occurrence)
        # Instantiate an empty Node using the NodeManager class
        node = NodeManager(class_prediction=class_prediction,depth=depth)
        # Create a variable in the node class for the number of samples
        node.samples = y.size
        # As long as max depth is not reached...
        if depth < self.maximum_depth:
            # apply next_split method to get col and threshold of optimal split
            # Returns None of Leaf is reached (branch is finished)
            col, threshold = self.next_split(X, y)
            # If leaf is not reached...
            if col and threshold:
                # split the data and labels into right and left using col and threshold
                # left index for data points below threshold
                left_idx = X[:, col] < threshold
                # right index for datapoints eqal or above threshold
                right_idx = X[:, col] >= threshold
                # dataset of left node
                X_left = X[left_idx]
                y_left = y[left_idx]
                # dataset of right node
                X_right = X[right_idx]
                y_right = y[right_idx]
                # Store parameters to Node manager
                node.feature_idx = col
                node.threshold = threshold
                # Apply build_tree function inside build_tree to build the next level of nodes with left node
                # Increase depth counter by 1
                node.left = self.build_tree(X_left, y_left, depth+1)
                # store info that node was left side of parent node in node manager
                node.left.leftbranch = True
                # Apply build_tree function inside build_tree to build the next level of nodes with right node
                node.right = self.build_tree(X_right, y_right, depth+1)
                # store info that node was right side of parent node in node manager
                node.right.rightbranch = True
                
        return node

    def predict(self, X_test):
        """
        Takes X_test as input and predicts labels
        using the built decision tree. 
        Uses the stored information about split col,
        split threshold, left or right, leftbranch or 
        right branch from NodeManager.
        """
        # assign root node
        node = self.tree
        # empty prediction list
        y_preds = []
        # Iterate over data points in X_test
        for data_point in X_test:
            # Reasign root node
            node = self.tree
            # Go through the decision nodes of the tree and use optimal
            # split columns and optimal thresholds to predict
            # classification of data point.
            while node.left:
                # Make Predictions with left node
                if data_point[node.feature_idx] < node.threshold:
                    node = node.left
                # Make Predictions with right node
                else:
                    node = node.right
            # append prediction to prediction list
            y_preds.append(node.class_prediction)
            
        # return predictions as numpy array
        return np.array(y_preds)

### Test Decision Tree

In [5]:
def train_test_split(X, y, shuffle=True, test_size=0.3):
    """
    Split data into training and testing set.
    """
    n_train = int(X.shape[0]*(1-test_size))
    indices = np.arange(len(X))
    if shuffle: 
        np.random.shuffle(indices)
    train = indices[:n_train]
    test = indices[n_train:]
    X_train, X_test = X[train], X[test]
    y_train, y_test = y[train], y[test]
    
    return X_train, X_test, y_train, y_test

In [6]:
X = iris.data
y = iris.target
# train test split (30% for testing)
X_train, X_test, y_train, y_test = train_test_split(X=X, y=y, shuffle=True, test_size=0.3)

# For binary classification: merge Iris virginica and Iris setosa into one class.
y_train = np.where(y_train==2, 0, y_train)
y_test = np.where(y_test==2, 0, y_test)

In [7]:
# showcase
decision_tree = DecisionTree(maximum_depth=10)
decision_tree.fit(X_train, y_train)
print('Predictions:')
decision_tree.predict(X_test)

Predictions:


array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1,
       0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1,
       1])

In [8]:
def get_accuracy(y_pred, y_test):
    """
    Takes y_test and y_pred as input and
    returns accuracy.
    """
    n_accurate = (y_pred == y_test).sum()
    n_total = len(y_test)
    accuracy = n_accurate / n_total
    return accuracy

In [9]:
# Report Accuracy for five runs:
def run_decision_tree(n_iterations=5):
    
    for i in range(0,n_iterations):
        # instantiate decision tree with DecisionTree class
        decision_tree = DecisionTree(maximum_depth=10)
        # generate random subset idx
        subset_idx = np.random.randint(low=0, high=len(X), size=75)
        # select data with subset_idx
        X_sub = X[subset_idx]
        y_sub = y[subset_idx]
        # train test split
        X_train, X_test, y_train, y_test = train_test_split(X=X_sub, y=y_sub, shuffle=True, test_size=0.3)
        # fit model
        decision_tree.fit(X_train, y_train)
        # predict y_hat
        y_pred = decision_tree.predict(X_test)
        # compute accuracy
        acc = get_accuracy(y_pred, y_test)
        print('Accuracy Score run {}: {}'.format(i+1,acc))

In [10]:
run_decision_tree(n_iterations=5)

Accuracy Score run 1: 0.782608695652174
Accuracy Score run 2: 0.6956521739130435
Accuracy Score run 3: 0.6956521739130435
Accuracy Score run 4: 0.6956521739130435
Accuracy Score run 5: 0.9565217391304348
