In [14]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

from collections import Counter
import math
from math import log

import sys
import pprint


# table 5.1 from book
def create_data():
    datasets = [['teen', 'no', 'no', 'intermediate', 'no'],
               ['teen', 'no', 'no', 'good', 'no'],
               ['teen', 'yes', 'no', 'good', 'yes'],
               ['teen', 'yes', 'yes', 'intermediate', 'yes'],
               ['teen', 'no', 'no', 'intermediate', 'no'],
               ['middle-age', 'no', 'no', 'intermediate', 'no'],
               ['middle-age', 'no', 'no', 'good', 'no'],
               ['middle-age', 'yes', 'yes', 'good', 'yes'],
               ['middle-age', 'no', 'yes', 'very-good', 'yes'],
               ['middle-age', 'no', 'yes', 'very-good', 'yes'],
               ['old', 'no', 'yes', 'very-good', 'yes'],
               ['old', 'no', 'yes', 'good', 'yes'],
               ['old', 'yes', 'no', 'good', 'yes'],
               ['old', 'yes', 'no', 'very-good', 'yes'],
               ['old', 'no', 'no', 'intermediate', 'no'],
               ['teen', 'no', 'no', 'intermediate', 'yes']
               ]
    labels = ['age', 'have job', 'own house', 'credit situation', 'type']
    
    return datasets, labels

In [15]:
def print_node(node, depth=0):  
    if node.splitting_feature_id is None:
        print(depth, (node.splitting_feature_id, node.splitting_feature_value, 
                      node.class_label, len(node.data)))
    else:
        print(depth, (node.splitting_feature_id, node.splitting_feature_value))
        for c in node.child:
            print_node(c, depth+1)

In [16]:
class Node:
    def __init__(self, splitting_feature_id=None, class_label=None, data=None, 
                 splitting_feature_value=None):
        self.splitting_feature_id = splitting_feature_id     # the splitting point
        self.splitting_feature_value = splitting_feature_value   # feature value
        self.class_label = class_label               # class label, only leaf own 
        self.data = data                             # labels of samples, only leaf own
        self.child = []                              # child node
    
    
    def add_node(self, node):
        self.child.append(node)
        
    def predict(self, test_features):
        if self.class_label is not None:
            return self.class_label
        for c in self.child:
            if c.splitting_feature_value == test_features[self.splitting_feature_id]:
                return c.predict(test_features)

class DTree:
    def __init__(self, epsilon=0, alpha=0):
        self.epsilon = epsilon
        self.alpha = alpha
        self.tree = Node()
        
    def cal_entropy(self, datasets):
        n = len(datasets)
        label_count = {}
        # get distribution(Pi)
        for i in range(n):
            label = datasets[i][-1]
            if label not in label_count:
                label_count[label] = 0
            label_count[label] += 1
        # print(label_count) {'no': 6, 'yes': 9}
        empirical_entropy = -sum([(p/n) * log(p/n, 2) for p in label_count.values()])

        return empirical_entropy

    # empirical conditional entropy
    def cal_conditional_entropy(self, datasets, axis=0):
        n = len(datasets)
        feature_sets = {}
        for i in range(n):
            feature = datasets[i][axis]
            if feature not in feature_sets:
                feature_sets[feature] = []
            feature_sets[feature].append(datasets[i])

        empirical_conditional_entropy = sum([(len(p)/n) * self.cal_entropy(p)
                                            for p in feature_sets.values()])

        return empirical_conditional_entropy

    # information gain
    def info_gain(self, entropy, con_entropy):
        return entropy - con_entropy

    def get_info_gain(self, datasets):
        feature_count = len(datasets[0]) - 1
        empirical_entropy = self.cal_entropy(datasets)
        best_feature = []

        for c in range(feature_count):
            c_info_gain = self.info_gain(empirical_entropy, 
                                         self.cal_conditional_entropy(datasets, axis=c))
            best_feature.append((c, c_info_gain))

        best = max(best_feature, key=lambda x : x[-1])
        # best : ((feature_id, feature_info_gain))
        return best
    
    def train(self, train_data, node):
        '''
        Input : Dataset(DataFrame), Feature_set A, threshold epsilon
        Output : T(Decision Tree)
        '''
        _ = train_data.iloc[:, :-1]
        y_train = train_data.iloc[:, -1]
        features = train_data.columns[:-1]
        
        # 1. if all the data in D belong to the same class C, 
        #    set T as single node and use C as the label, return T
        if len(y_train.value_counts()) == 1:
            node.class_label = y_train.iloc[0]
            node.data = y_train
            return 
        
        # 2. if feature A is empty, set T as single node and use the most C as the label, 
        #    return T
        if len(features) == 0:
            node.class_label = y_train.value_counts().sort_values(ascending=False).index[0]
            node.data = y_train
            return 
        
        # 3. calculate the largest inforamtion gain, use Ag to representative the best feature
        max_feature_id, max_info_gain = self.get_info_gain(np.array(train_data))
        max_feature_name = features[max_feature_id]
     
        # 4. if the information gain is smaller than threshold, set T as single node,
        #    and use the most C as the label, return T 
        if max_info_gain <= self.epsilon:
            node.class_label = y_train.value_counts().sort_values(ascending=False).index[0]
            node.data = y_train
            return 
    

        # 5. splitting D according to every possible values in the feature A
        feature_list = train_data[max_feature_name].value_counts().index
        for Di in feature_list:
            node.splitting_feature_id = max_feature_id
            child = Node(splitting_feature_value = Di)
            node.add_node(child)
            sub_train_data = pd.DataFrame([list(i) for i in train_data.values if i[max_feature_id] == Di],
                                         columns = train_data.columns)
            
            # 6. create tree recursively
            self.train(sub_train_data, child)
    
    def fit(self, train_data):
        self.train(train_data, self.tree)
        
    
    def predict(self, x_test):
        return self.tree.predict(x_test)
    

    def find_parent(self, node, error_min):
        '''
        leaf nodes : class_label -> not None
                     data -> not None
                     splitting_feature_id -> None
                     splitting_feature_value -> None
                     child -> None
        ---------------------------------------------
        others :     class_label -> None
                     data -> None
                     splitting_feature_id -> not None
                     splitting_feature_value -> not None(except root)
                     child -> not None
        '''
        if node.splitting_feature_id is not None:  # if not the leaf nodes
            # collect class_labels from child nodes
            class_label = [c.class_label for c in node.child]
            
            # if all the child nodes are leaf nodes
            if None not in class_label:  
                # collect data from child nodes
                child_data = []
                for c in node.child:
                    for d in list(c.data):
                        child_data.append(d)
                child_counter = Counter(child_data)

                # copy the old node
                old_child = node.child
                old_splitting_feature_id = node.splitting_feature_id
                old_class_label = node.class_label
                old_data = node.data
                
                # pruning
                node.splitting_feature_id = None
                node.class_label = child_counter.most_common(1)[0][0]
                node.data = child_data
                
                error_after_pruning = self.c_error()
                # if error_after_pruning <= error_min, it is worth to pruning
                if error_after_pruning <= error_min:
                    error_min = error_after_pruning
                    return 1
                # if not, recover the previous tree
                else:
                    node.child = old_child
                    node.splitting_feature_id = old_splitting_feature_id
                    node.class_label = old_class_label
                    node.data = old_data
                    
            # if not all the child nodes are leaf nodes
            else:
                re = 0
                i = 0
                while i < len(node.child):
                    # if the pruning action happend, 
                    # rescan the subtree since there are new leafs created
                    if_re = self.find_parent(node.child[i], error_min)
                    if if_re == 1:   
                        re = 1
                    elif if_re == 2:  
                        i -= 1
                    i += 1
                if re:
                    return 2

        return 0
                
    
    def find_leaf(self, node, leaf):   # find all leaf nodes
        for t in node.child:
            if t.class_label is not None:
                leaf.append(t.data)
            else:
                for c in node.child:
                    self.find_leaf(c, leaf)
    
    def c_error(self):   # calculate C_alpha_T for current subTree
        leaf = []
        self.find_leaf(self.tree, leaf)
        leaf_num = [len(l) for l in leaf]
        
        # calculate empirical entropy for each leaf nodes
        entropy = [self.cal_entropy(l) for l in leaf]
        
        # alpha * |T|
        alpha_T = self.alpha * len(leaf_num)
        
        error = 0
        C_alpha_T = 0 + alpha_T
        
        for Nt, Ht in zip(leaf_num, entropy):
            C_T = Nt * Ht
            error += C_T
            
        C_alpha_T += error   
        
        return C_alpha_T
        
    
    def pruning(self, alpha=0):
        if alpha:
            self.alpha = alpha
            
        error_min = self.c_error()
        
        self.find_parent(self.tree, error_min)

In [17]:
datasets, labels = create_data()
data_df = pd.DataFrame(datasets, columns = labels)
dt = DTree(epsilon=0)
dt.fit(data_df)

#result = dt.predict(['old', 'no', 'no', 'intermediate'])
print_node(dt.tree)

print('=============================')

dt.pruning(alpha=0.5)
print_node(dt.tree)

0 (2, None)
1 (1, 'no')
2 (0, 'no')
3 (3, 'teen')
4 (None, 'intermediate', 'no', 3)
4 (None, 'good', 'no', 1)
3 (None, 'middle-age', 'no', 2)
3 (None, 'old', 'no', 1)
2 (None, 'yes', 'yes', 3)
1 (None, 'yes', 'yes', 6)
0 (2, None)
1 (1, 'no')
2 (None, 'no', 'no', 7)
2 (None, 'yes', 'yes', 3)
1 (None, 'yes', 'yes', 6)


In [10]:
# lab
a = 0
if a:
    print('here')