In [1]:
from sklearn.datasets import load_iris
from sklearn import tree
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
iris = load_iris()
X, y = iris.data, iris.target
# clf = tree.DecisionTreeClassifier()
# clf = clf.fit(X, y)
iris.keys()

dict_keys(['data', 'target', 'frame', 'target_names', 'DESCR', 'feature_names', 'filename', 'data_module'])

In [34]:
from sklearn.datasets import load_wine
wine = load_wine()
X, y = wine.data, wine.target
wine.keys()

dict_keys(['data', 'target', 'frame', 'target_names', 'DESCR', 'feature_names'])

In [36]:
X.shape, y.shape

((178, 13), (178,))

In [37]:
import numpy as np
from collections import Counter
import random

In [38]:
def gini_impurity(y):
    gini = 0
    total = len(y)

    n_classes = len(set(y))
    counts = Counter(y)
    
    for k,v in counts.items():
        gini += (v/total)**2
    gini_impurity = 1 - gini
    return gini_impurity


def feature_split_interval(X_feature, interval_granularity=0.2, default_max_len_intervals=5):
    """
    Get the intervals for a particular feature to find the gini impurity values for the split.
    """
    X_feature = X_feature.reshape(-1,)

    feature_max, feature_min = max(X_feature), min(X_feature)

    max_len_intervals = max(default_max_len_intervals, int(interval_granularity * X_feature.shape[0]))
    # print(max_len_intervals)

    X_feature_sorted = np.sort(X_feature)

    unique_values = np.unique(X_feature)

    if unique_values.shape[0] == 1:
        # print(np.array([unique_values[0]]))
        return np.array([unique_values[0]])
    elif unique_values.shape[0] == 2:
        # print(np.array([unique_values.mean()]))
        return np.array([unique_values.mean()])

    # return X_feature_sorted

    cum_diff = np.roll(X_feature_sorted, -1) - X_feature_sorted
    cum_diff = cum_diff[:-1]
    max_diff, min_diff = max(cum_diff), min(cum_diff)
    median_diff = np.median(cum_diff)
    median_nonzero_diff = np.median(cum_diff[cum_diff!=0])
    min_nonzero_diff = np.min(cum_diff[cum_diff!=0])

    start = feature_min + min_nonzero_diff/2
    end = feature_max - min_nonzero_diff/2
    min_nonzero_diff_interval = np.arange(start, end, min_nonzero_diff)

    # print(start, end, min_nonzero_diff_interval)

    if min_nonzero_diff_interval.shape[0] > max_len_intervals:
        return np.linspace(start, end, max_len_intervals)

    return min_nonzero_diff_interval


def optimal_feature_split(X_feature, y):
    split_intervals = feature_split_interval(X_feature)

    parent_node_gini_impurity = gini_impurity(y)

    min_weighted_gini_impurity = 1

    # print(split_intervals, X_feature)
    optimal_split_values = [split_intervals[0]]

    for split_value in split_intervals:
        # print(split_value)
        ge_split = y[X_feature >= split_value]
        lt_split = y[X_feature < split_value]
        weighted_gini_impurity = (gini_impurity(lt_split) * len(lt_split) \
            + gini_impurity(ge_split) * len(ge_split))/ len(y)
        
        if weighted_gini_impurity <= min_weighted_gini_impurity:
            if weighted_gini_impurity == min_weighted_gini_impurity:
                optimal_split_values.append(split_value)
            else:
                optimal_split_values = [split_value]
                min_weighted_gini_impurity = weighted_gini_impurity
    
    return np.mean(optimal_split_values), min_weighted_gini_impurity

In [39]:
class Node:
    """
    A class to represent a node(leaf) of a decision tree.
    """

    def __init__(self, X, y, height, max_tree_height):
        self.X = X
        self.y = y
        self.height = height
        self.max_tree_height = max_tree_height
        self.gini_impurity = gini_impurity(y)
        
        self.num_samples, self.num_features = X.shape
        self.final_class = Counter(y).most_common()[0][0]
        self.left_child = None
        self.right_child = None

        # print(self.gini_impurity)

        if self.gini_impurity == 0 or self.height >= max_tree_height:
            self.is_terminal = True
            # print(self.gini_impurity, self.height, self.max_tree_height)
        else:
            self.is_terminal = False
            self.feature_split = random.choice([*range(self.num_features)])
            # print(X[:,self.feature_split])
            split_intervals = feature_split_interval(X[:,self.feature_split])
            # print(split_intervals)
            self.split_value = np.random.choice(split_intervals)
        
        # print(X.shape, y.shape, self.gini_impurity, self.is_terminal)


    def fit(self):
        # print(self.X.shape)
        min_gini_impurity = 1
        for i in range(self.num_features):
            split_value, split_gini_impurity = optimal_feature_split(self.X[:,i], self.y)
            if split_gini_impurity < min_gini_impurity:
                min_gini_impurity = split_gini_impurity
                best_feature = i
                best_feature_split_value = split_value
        
        X_feature = self.X[:, best_feature]
        
        X_lt = self.X[X_feature < best_feature_split_value, :]
        X_ge = self.X[X_feature >= best_feature_split_value, :]

        y_lt = self.y[X_feature < best_feature_split_value]
        y_ge = self.y[X_feature >= best_feature_split_value]

        self.feature_split = best_feature
        self.split_value = best_feature_split_value
        self.split_gini_impurity = min_gini_impurity

        # print(best_feature, best_feature_split_value, min_gini_impurity, self.height)

        if self.gini_impurity <= min_gini_impurity:
            # This means the split did not improve the gini impurity, we need to stop
            return self

        self.left_child = Node(X_lt, y_lt, self.height+1, self.max_tree_height)
        self.right_child = Node(X_ge, y_ge, self.height+1, self.max_tree_height)

        if not self.left_child.is_terminal:
            self.left_child.fit()

        if not self.right_child.is_terminal:
            self.right_child.fit()

        return self


    def predict_sample(self, X_sample):
        if self.is_terminal:
            return self.final_class
        if X_sample[self.feature_split] >= self.split_value:
            # Pass on to the right child
            if self.right_child is not None:
                return self.right_child.predict_sample(X_sample)
            else:
                return self.final_class
        else:
            if self.left_child is not None:
                return self.left_child.predict_sample(X_sample)
            else:
                return self.left_child

In [40]:
class DecisionTreeClassifier:
    """
    A Decision Tree Classifier Machine Learning algorithm.

    Parameters:
        max_depth (int): The maximum depth of the decision tree
        criterion (str): The criterion for splitting - 'gini' or 'information gain'
    """

    def __init__(self, max_depth: int=20, criterion: str='gini'):
        self.max_depth = max_depth


    def fit(self, X: np.array, y: np.array):
        n_samples, n_features = X.shape
        assert n_samples == y.shape[0]
        assert len(y.shape) == 1 or y.shape[1] == 1

        self.root_node = Node(X, y, 1, self.max_depth)
        self.root_node.fit()

        return self

    
    def predict(self, X):
        y_pred = np.array([self.root_node.predict_sample(X_sample) for X_sample in X])
        return y_pred

In [41]:
X_train, X_test, y_train, y_test = train_test_split(X,y, test_size=0.25)

In [64]:
clf = DecisionTreeClassifier(max_depth=4)
clf.fit(X_train, y_train)
y_test_pred = clf.predict(X_test)
y_train_pred = clf.predict(X_train)
accuracy_score(y_train, y_train_pred), accuracy_score(y_test, y_test_pred)

(0.9924812030075187, 0.9333333333333333)

In [65]:
clf = tree.DecisionTreeClassifier()
clf.fit(X_train, y_train)
y_test_pred = clf.predict(X_test)
y_train_pred = clf.predict(X_train)
accuracy_score(y_train, y_train_pred), accuracy_score(y_test, y_test_pred)

(1.0, 0.9333333333333333)