In [18]:
import pandas as pd
import numpy as np
from math import pi, sqrt, exp, log2, ceil
from queue import Queue

In [19]:
class BTreeNode():
    def __init__(self, instance):
        self.node = instance
        self.left_child = None
        self.right_child = None

    def __repr__(self):
        return (str(self.node))

In [20]:
class BinaryTree():
    def __init__(self, root=None):
        root_node = BTreeNode(root) if root is not None else None
        self.root = root_node
        # self.size = 1


    def BFS_traverse(self):
        # print out Binary Tree in BFS traversal order
        unvisited = Queue()
        unvisited.put(self.root)
        nodes_in_the_same_level = []

        while not unvisited.empty():
            curr_node = unvisited.get()
            nodes_in_the_same_level.append([curr_node])
            if curr_node is not None:
                unvisited.put(curr_node.left_child)
                unvisited.put(curr_node.right_child)
        count = 0
        exp_of_two = 1
        _ = []
        for i in range(len(nodes_in_the_same_level)):
            count += 1
            _.append(nodes_in_the_same_level[i])
            if count == exp_of_two:
                print(f"level {log2(exp_of_two)}")
                for j in range(len(_)):
                    yes_or_no = "Yes" if j % 2 == 0 else "No"
                    print(f"{yes_or_no}: {_[j]}")
                count = 0
                exp_of_two *= 2
                _ = []
        print(f"level {ceil(log2(exp_of_two))}")
        for j in range(len(_)):
            yes_or_no = "Yes" if j % 2 == 0 else "No"
            print(f"{yes_or_no}: {_[j]}")

In [21]:
class DecisionTreeClassifier():
    def __init__(self, D, class_labels, is_categorical=False):
        # param: D(pd.DataFrame) is a data set. class_labels(pd.DataFrame) is a class label(y) for each point xj in D
        # param: is_categorical(Bool) true if D based on categorical attributes
        # Merge D and class_labels. The last column of data_set will contain the class labels for each data point
        if D.shape[0] != class_labels.shape[0]: raise IndexError("Data and Class Label must have the same number of rows")
        data_set = D.copy()
        #print(class_labels.iloc[:,0])
        data_set.insert(loc=len(data_set.columns), column='classes', value=class_labels.iloc[:,0])
        self.data_set = data_set  # class labels of each data point inserted in the last column
        self.is_categorical = is_categorical
        self.class_domain = list(set(class_labels.iloc[:, 0]))
        self.decision_tree = BinaryTree()
        self.is_trained = False


    def calculate_entropy(self, nvi_lst):
        #param: nvi_lst(list) # of counts of attribute Xj <=v(split point) and xj=Class i in the data set
        try:
            total_pts_counts = sum(nvi_lst)  # total number of data points in the Data
            entropy = 0
            for i in range(len(nvi_lst)):
                prob_class_i_given_D = nvi_lst[i] / total_pts_counts
                entropy += prob_class_i_given_D * log2(prob_class_i_given_D)
        except ZeroDivisionError:
            # this happens when computing the entropy of data set with no data point
            # In this case, I assumed the entropy would be zero as it shouldn't affect the result of the algorithm
            entropy = 0
        return entropy * -1


    def calc_info_gain(self, nvi_lst, nci_lst):
        # param: nvi_lst(list) # of counts of attribute Xj <=v(split point) and xj=Class i in the data set(or splitted)
        # param:nci_lst(list) # of counts of xj=Class i in the data set(can be splitted data set)
        D_pts_counts = sum(nci_lst)
        Y_pts_counts = sum(nvi_lst)
        N_pts_counts = D_pts_counts - Y_pts_counts
        entroy_D = self.calculate_entropy(nci_lst)
        entroy_D_Y = self.calculate_entropy(nvi_lst)
        nvi_complement_lst = [nci_lst[i] - nvi_lst[i] for i in range(len(nci_lst))]
        entroy_D_N = self.calculate_entropy(nvi_complement_lst)
        info_gain = entroy_D - (Y_pts_counts/D_pts_counts*entroy_D_Y) - (N_pts_counts/D_pts_counts*entroy_D_N)
        return info_gain


    def search_optimal_numeric_split_point(self, attr_index):
        # param: attr_index(int) indicates the column index of attribute where we want to find the best split point
        modified_D = self.data_set.loc[:, [self.data_set.columns[attr_index], 'classes']]
        Xj_attribute_class_lst = [[modified_D.iloc[i,0],modified_D.iloc[i,1]] for i in range(len(modified_D))]
        Xj_attribute_class_lst.sort(key=lambda x: x[0])  # 2d list. first column: attribute, second column: class
        midp_lst = []
        all_n_v_class_i_lst = []  # list of lists containing nvi for all midpoints v
        n_class_i_lst = [0] * len(self.class_domain)  # list containing the counts of data points for all cluster in data set
        for k in range(len(Xj_attribute_class_lst)):  # loop through all data points except for the last data point
            if k < len(Xj_attribute_class_lst) - 1:
                midpoint = (Xj_attribute_class_lst[k][0] + Xj_attribute_class_lst[k+1][0])/2
            n_class_i_lst[self.class_domain.index(Xj_attribute_class_lst[k][1])] += 1
            if k > 0 and Xj_attribute_class_lst[k][0] <= midp_lst[-1]:  # this happens when two same values of attribute Xj exist
                all_n_v_class_i_lst[-1] = n_class_i_lst[:]
            if k == 0 or midp_lst[-1] != midpoint:  # midpoint must be unique
                midp_lst.append(midpoint)
                all_n_v_class_i_lst.append(n_class_i_lst[:])
        optimal_split_point, best_score = None, -float('inf')
        for k in range(len(midp_lst)):
            score = self.calc_info_gain(all_n_v_class_i_lst[k], n_class_i_lst)
            print(f"{attr_index}-th column split point {midp_lst[k]} information gain: {score}")
            if best_score < score:
                optimal_split_point = midp_lst[k]
                best_score = score
        return optimal_split_point, best_score


    def search_optimal_categorical_split_point(self, attr_index):
        # param: attr_index(int) indicates the column index of attribute where we want to find the best split point
        modified_D = self.data_set.loc[:, [self.data_set.columns[attr_index], 'classes']]
        Xj_attribute_class_lst = [[modified_D.iloc[i, 0], modified_D.iloc[i, 1]] for i in range(len(modified_D))]
        n_class_i_lst = [0] * len(self.class_domain)  # contains the counts of data points for all cluster in data set
        all_n_v_class_i_dict = {}  # dictionary containing as value nvi for all categories vi in dom(Xj), (key: vi)
        for i in range(len(Xj_attribute_class_lst)):
            n_class_i_lst[self.class_domain.index(Xj_attribute_class_lst[i][1])] += 1
            if all_n_v_class_i_dict.get(Xj_attribute_class_lst[i][0]) is None:
                all_n_v_class_i_dict[Xj_attribute_class_lst[i][0]] = [0]*len(self.class_domain)
        for i in range(len(Xj_attribute_class_lst)):
            class_index = self.class_domain.index(Xj_attribute_class_lst[i][1])
            category_attr_val = Xj_attribute_class_lst[i][0]
            all_n_v_class_i_dict[category_attr_val][class_index] += 1
        optimal_split_point, best_score = None, -float('inf')
        for key, val in all_n_v_class_i_dict.items():
            score = self.calc_info_gain(val, n_class_i_lst)
            print(f"{attr_index}-th column split point {key} information gain: {score}")
            if best_score < score:
                optimal_split_point = key
                best_score = score
        return optimal_split_point, best_score


    def paritioning_regions(self, D, purity_threshold, num_pts_threshold, parent_split_pt=None):
        # param: D is data set(or Region/splitted data set)
        # param: purity_threshold indicates that if max purity of D <= this threshold, then leaf node created
        # param: num_pts_threshold(int, default=0) indicates that if |D| is >= this threshold, then leaf node created
        # param: parent_split_pt(tuple) is a split point and its attribute column index of parent region
        # return list containing split points in the order of Depth-First
        print(f"Parent split point: {parent_split_pt}")
        n = len(D)
        n_class_i_lst = [0] * len(self.class_domain)
        for i in range(len(D)):
            n_class_i_lst[self.class_domain.index(D.iloc[i, -1])] += 1
        max_purity, c_star = -1, None  # c_star is the class with the max purity
        for i in range(len(n_class_i_lst)):
            if max_purity < n_class_i_lst[i]:
                max_purity = n_class_i_lst[i]
                c_star = self.class_domain[i]
        if n <= num_pts_threshold or max_purity / n >= purity_threshold:
            return BTreeNode((D, c_star))
        best_split_pt, max_score = None, -float('inf')
        for col_j in range(D.shape[1] - 1):
            if self.is_categorical:
                split_pt, score = self.search_optimal_categorical_split_point(col_j)
            else:
                split_pt, score = self.search_optimal_numeric_split_point(col_j)
            if max_score < score and ((split_pt, col_j) != parent_split_pt):
                best_split_pt = (split_pt, col_j)
                max_score = score
        quot = "'" if type(best_split_pt[0]) == str else ""
        yes_expr = "==" if self.is_categorical else "<="
        no_expr = "!=" if self.is_categorical else ">"
        query1 = f"{D.columns[best_split_pt[1]]} {yes_expr} {quot}{best_split_pt[0]}{quot}"
        query2 = f"{D.columns[best_split_pt[1]]} {no_expr} {quot}{best_split_pt[0]}{quot}"
        D_Y = D.query(query1)
        D_N = D.query(query2)
        node = BTreeNode(best_split_pt)
        if self.decision_tree.root is None:
            self.decision_tree.root = node
        node.left_child = self.paritioning_regions(D_Y, purity_threshold, num_pts_threshold, best_split_pt)
        node.right_child = self.paritioning_regions(D_N, purity_threshold, num_pts_threshold, best_split_pt)
        return node


    def train(self, purity_threshold, num_pts_threshold=0):
        # param: purity_threshold indicates that if max purity of D <= this threshold, then leaf node created
        # param: num_pts_threshold(int, default=0) indicates that if |D| is >= this threshold, then leaf node created
        # return list containing split points in the order of Depth-First
        self.paritioning_regions(self.data_set, purity_threshold, num_pts_threshold)
        self.is_trained = True
        return self.decision_tree

In [22]:
price = [10, 10, 20, 10, 10, 10, 20, 20]
chef = ['A', 'A', 'A', 'B', 'B', 'B', 'B', 'B']
class_quality = ['L', 'L', 'H', 'H', 'H', 'H', 'L', 'H']
D = pd.DataFrame({'Price': price, 'Chef': chef})
class_labels = pd.DataFrame({'Quality': class_quality})

DTC = DecisionTreeClassifier(D, class_labels, is_categorical=True)
DTC.train(purity_threshold=0.75)
if DTC.is_trained:
    DTC.decision_tree.BFS_traverse()
else:
    print("Not Trained")

Parent split point: None
0-th column split point 10 information gain: 0.003228943620363578
0-th column split point 20 information gain: 0.0032289436203635224
1-th column split point A information gain: 0.15886800584993
1-th column split point B information gain: 0.15886800584993005
Parent split point: ('B', 1)
Parent split point: ('B', 1)
0-th column split point 10 information gain: 0.003228943620363578
0-th column split point 20 information gain: 0.0032289436203635224
1-th column split point A information gain: 0.15886800584993
1-th column split point B information gain: 0.15886800584993005
Parent split point: (10, 0)
Parent split point: (10, 0)
level 0.0
Yes: [('B', 1)]
level 1.0
Yes: [(   Price Chef classes
3     10    B       H
4     10    B       H
5     10    B       H
6     20    B       L
7     20    B       H, 'H')]
No: [(10, 0)]
level 2.0
Yes: [None]
No: [None]
Yes: [(   Price Chef classes
0     10    A       L
1     10    A       L, 'L')]
No: [(   Price Chef classes
2     20