In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
def entropy(y):
    hist = np.bincount(y)
    ps = hist / np.sum(hist)
    return -np.sum([p * np.log2(p) for p in ps if p > 0])

In [3]:
def mse(y):
    return np.mean((y - np.mean(y)) ** 2)

In [4]:
def gini(y):
    hist = np.bincount(y)
    n = np.sum(hist)
    return 1 - sum([(i/n)**2 for i in hist])

In [5]:
class Node:
    def __init__(self, left, right, rule):
        self.left = left
        self.right = right
        self.feature = rule[0]
        self.thresh = rule[1]

In [6]:
class Leaf:
    def __init__(self, value):
        '''
        if classifier == True:, value = array of probs
        else: value = mean of region
        '''
        self.value = value

In [7]:
class DecisionTree:
    def __init__(self, classifier=True, max_depth=None, n_feats=None, criterion='entropy',seed=None,):
        if seed: np.random.seed(seed)
        self.depth = 0
        self.root = None
        self.n_feats = n_feats
        self.criterion = criterion
        self.classifier = classifier
        self.max_depth = max_depth if max_depth else np.inf
        
        if not classifier and criterion in ['gini', 'entropy']:
            raise ValueError(f"{criterion} valid when classifier = True")
        if classifier and criterion == 'mse':
            raise ValueError(f"{criterion} valid when classifier = False")
        
    def fit(self, x, y):
        #let target classes represented as integers so max(y) = 0,1,2,3..n 
        self.n_classes = max(y) + 1 if self.classifier else None
        #if num of feats is None then n_cols in x, otherwise min cols
        self.n_feats = x.shape[1] if not self.n_feats else min(self.n_feats, x.shape[1])
        self.root = self._grow(x, y)
    
    def predict(self, x):
        return np.array([self._traverse(xi, self.root) for xi in x])
    
    def predict_class_probs(self, x):
        assert self.classifier, "'predict_class_probs' undefined for classifier = False"
        return np.array([self._traverse(xi, self.root, prob=True) for xi in x])

    # Recursively Builds Tree
    def _grow(self, x, y, cur_depth=0):
        if len(set(y)) == 1: # all y are same class
            if self.classifier: # classification problem
                prob = np.zeros(self.n_classes) #Prob of All classes = 0 
                prob[y[0]] = 1.0                #except present class
            return Leaf(prob) if self.classifier else Leaf(y[0])
        
        if cur_depth >= self.max_depth: #Max depth reached, stopping condition
            v = np.mean(y, axis=0)
            if self.classifier: #prob of each class = no. of class vals/total vals
                v = np.bincount(y, minlength=self.n_classes) / len(y) 
            return Leaf(v)
        # max depth not reached
        cur_depth += 1
        # current depth is max of depth stored in tree and tracked depth
        self.depth = max(self.depth, cur_depth)
        
        n, m = x.shape
        # Randomizing the order of column indexes of x[0,1,2,3]
        # After randomization feats_idx = [3,0,1,2] etc
        feats_idx = np.random.choice(m, self.n_feats, replace=False)
        # greedily select the best split according to `criterion`
        feat, thresh = self._segment(x, y, feats_idx)
        
        # Determining left and right children like Binary Search Trees
        # Left are values less-equal to thresh
        # Right are values greater than thresh
        l = np.argwhere(x[:, feat] <= thresh).flatten()
        r = np.argwhere(x[:, feat] > thresh).flatten()
        left = self._grow(x[l,:], y[l], cur_depth)
        right = self._grow(x[r,:], y[r], cur_depth)
        
        return Node(left, right, (feat, thresh))

    def _segment(self, x, y, feat_idx):
        '''
        Find optimal split rule(feature index, and split threshold value)
        for data according to self.criterion i.e.,entropy, mse, gini 
        feat_idx = randomized order of feature indexes
        '''
        best_gain = -np.inf # negative infinity
        split_idx, split_thresh = None, None
        for i in feat_idx:
            vals = x[:, i]  # vector of feature(column) values
            levels = np.unique(vals) # all possible values of the feature
            # if levels = [1,2,3,4,5], we are selecting ([1,2,3,4] + [2,3,4,5])/ 2 which will be [1.5,2.5,3.5,4.5] 
            thresh = (levels[:-1] + levels[1:]) / 2 if len(levels) > 1 else levels

            # Reduction in impurity achieved by splitting given feat at t
            gains = np.array([self._impurity_gain(y, t, vals) for t in thresh])

            # Noting feature, split_index, and minimum impurity 
            if gains.max() > best_gain:
                split_idx = i # index
                best_gain = gains.max() # minimum impurity
                split_thresh = thresh[gains.argmax()] # threshold value                
        return split_idx, split_thresh

    def _impurity_gain(self, y, split_thresh, feat_values):
        '''
        Computing impurity gain with given split
        Impurity_gain(split) = loss(parent) - weighted_avg[loss(left_child), loss(right_child)]
        '''
        # func pointer to selected loss function(polymorphism)
        if self.criterion == 'entropy':
            loss = entropy
        elif self.criterion == 'gini':
            loss = gini
        elif self.criterion == 'mse':
            loss = mse
        parent_loss = loss(y) #loss before split

        # splitting vector based on threshold value
        left = np.argwhere(feat_values <= split_thresh).flatten()
        right = np.argwhere(feat_values > split_thresh).flatten()
        n_l, n_r = len(left), len(right)

        # No gain in impurity i.e, parent = [0,1,2,3] , l or r = [0,1,2,3]
        if n_l == 0 or n_r == 0: return 0 # No gains in making given split

        # Computing weighted avg. of loss for children
        n = len(y)
        e_l, e_r = loss(y[left]), loss(y[right])
        child_loss = (n_l / n) * e_l + (n_r / n) * e_r
        return parent_loss - child_loss

    def _traverse(self, x, node, prob=False):
        '''Simple recursive depth first traversal of tree'''
        if isinstance(node, Leaf):
            if self.classifier:
                return node.value if prob else node.value.argmax()
            return node.value
        
        if x[node.feature] <= node.thresh:
            return self._traverse(x, node.left, prob)
        return self._traverse(x, node.right, prob) 
    
    def accuracy(self, x, y_true):
        y_pred = self.predict(x)
        return np.sum(y_pred == y_true)/len(y_true)

In [8]:
file = pd.read_csv('../ML/Datasets/carstats.csv')
print("Total Records: ", len(file))
data = file.values
file.head()

Total Records:  1728


Unnamed: 0,buying,maint,doors,persons,lug_boot,safety,class
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [9]:
columns = []
for idx in range(len(data[0])):
    columns.append(np.unique(data[:, idx]))
columns

[array(['high', 'low', 'med', 'vhigh'], dtype=object),
 array(['high', 'low', 'med', 'vhigh'], dtype=object),
 array(['2', '3', '4', '5more'], dtype=object),
 array(['2', '4', 'more'], dtype=object),
 array(['big', 'med', 'small'], dtype=object),
 array(['high', 'low', 'med'], dtype=object),
 array(['acc', 'good', 'unacc', 'vgood'], dtype=object)]

In [10]:
for i in range(len(data[0])):
    feature = data[:, i]
    classes = columns[i]
    for idx, class_ in enumerate(classes):
        feature[feature == class_] = idx
columns = []
for idx in range(len(data[0])):
    columns.append(np.unique(data[:, idx]))
columns

[array([0, 1, 2, 3], dtype=object),
 array([0, 1, 2, 3], dtype=object),
 array([0, 1, 2, 3], dtype=object),
 array([0, 1, 2], dtype=object),
 array([0, 1, 2], dtype=object),
 array([0, 1, 2], dtype=object),
 array([0, 1, 2, 3], dtype=object)]

In [11]:
x, y = data[:,:-1].astype(int), data[:,-1:].flatten().astype(int)
x_test, y_test = x[int(0.8*len(x)):], y[int(0.8*len(y)):]
x, y = x[:int(0.8*len(x))], y[:int(0.8*len(y))]
print(f"Train Data: {len(x)}, Test Data: {len(x_test)}")
x,y

Train Data: 1382, Test Data: 346


(array([[3, 3, 0, 0, 2, 1],
        [3, 3, 0, 0, 2, 2],
        [3, 3, 0, 0, 2, 0],
        ...,
        [1, 3, 3, 0, 2, 0],
        [1, 3, 3, 0, 1, 1],
        [1, 3, 3, 0, 1, 2]]),
 array([2, 2, 2, ..., 2, 2, 2]))

In [12]:
tree = DecisionTree(max_depth=10, criterion='entropy')
tree.fit(x, y)

In [13]:
print("Train Accuracy: ", tree.accuracy(x, y))
print("Test Accuracy: ", tree.accuracy(x_test, y_test))

Train Accuracy:  0.9876989869753979
Test Accuracy:  0.6705202312138728
