In [120]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


data = {
    'Outlook': ['Sunny', 'Sunny', 'Cloudy', 'Rainy', 'Rainy', 'Rainy', 'Cloudy', 'Sunny', 'Sunny', 'Rainy', 'Sunny', 'Cloudy', 'Cloudy', 'Rainy'],
    'Humidity': ['High', 'High', 'High', 'High', 'Normal', 'Normal', 'Normal', 'High', 'Normal', 'Normal', 'Normal', 'High', 'Normal', 'High'],
    'Wind': ['Weak', 'Strong', 'Weak', 'Weak', 'Weak', 'Strong', 'Strong', 'Weak', 'Weak', 'Weak', 'Strong', 'Strong', 'Weak', 'Strong'],
    'PlayTennis': ['No', 'No', 'Yes', 'Yes', 'Yes', 'No', 'Yes', 'No', 'Yes', 'Yes', 'Yes', 'Yes', 'Yes', 'No']
}

df = pd.DataFrame(data)
print(df.sort_values(by=['Outlook', 'Humidity', 'Wind']))
df_copy = df.copy()
mapping_dict_list = []
# turn categories labels into int [0, 1, 2, ...]
for c in df_copy.columns:
    mapping_dict = df_copy[c].astype('category').cat.categories.tolist()
    mapping_dict_list.append(dict(enumerate(mapping_dict)))
    
    # turn string labels into int
    df_copy[c] = df_copy[c].astype('category').cat.codes

X = df_copy.iloc[:, :-1].values
y = df_copy.iloc[:, -1].values
feature_names = df_copy.columns.tolist()
feature_names, mapping_dict_list

   Outlook Humidity    Wind PlayTennis
11  Cloudy     High  Strong        Yes
2   Cloudy     High    Weak        Yes
6   Cloudy   Normal  Strong        Yes
12  Cloudy   Normal    Weak        Yes
13   Rainy     High  Strong         No
3    Rainy     High    Weak        Yes
5    Rainy   Normal  Strong         No
4    Rainy   Normal    Weak        Yes
9    Rainy   Normal    Weak        Yes
1    Sunny     High  Strong         No
0    Sunny     High    Weak         No
7    Sunny     High    Weak         No
10   Sunny   Normal  Strong        Yes
8    Sunny   Normal    Weak        Yes


(['Outlook', 'Humidity', 'Wind', 'PlayTennis'],
 [{0: 'Cloudy', 1: 'Rainy', 2: 'Sunny'},
  {0: 'High', 1: 'Normal'},
  {0: 'Strong', 1: 'Weak'},
  {0: 'No', 1: 'Yes'}])

In [121]:
def select_data(X, y, index: int):
    X_i = X[:, index]
    x_values = np.unique(X_i)
    subsets = {}
    for v in x_values:
        selected = X_i == v
        subsets[v] = (X[selected], y[selected])
    
    return subsets


def compute_entropy(y, binary=False):
    eps = 1e-30
    
    def info(p):
        return - p * np.log2(p + eps)
    
    if binary:
        assert hasattr(y, "__iter__"), "input in binary mode must be iterable!"
        
        p = np.bincount(y) / len(y)  # np.bincount(y) counts of occurrences for each value from 0 to max(y)
        infos = info(p) + info(1-p)
        
        index = infos.argmax()
        max_info = infos[index]
        return max_info, index
    
    if hasattr(y, "__iter__"):
        p = np.bincount(y) / len(y)
        return np.sum(info(p))
    
    else:
        assert 0 <= y <= 1, "p must satisfy 0 <= p <= 1"
        return info(y) + info(1-y)


def compute_gini(y, binary=False):
    if binary:
        assert hasattr(y, "__iter__"), "input in binary mode must be iterable!"
        
        p = np.bincount(y) / len(y)  # np.bincount(y) counts of occurrences for each value from 0 to max(y)
        ginis = 1 - p**2 - (1-p)**2
        
        index = ginis.argmax()
        max_gini = ginis[index]
        return max_gini, index

    if hasattr(y, "__iter__"):
        p = np.bincount(y) / len(y)
        return 1 - np.sum(p**2)
    
    else:
        assert 0 <= y <= 1, "p must satisfy 0 <= p <= 1"
        return 1 - y**2 - (1-y)**2


def split_gain(X, y, features=None, criterion="entropy"):
    '''
    For all features in X, compute information gain. 
    X: np.array(n, k)
    y: np.array(n)
    features: int list, e.g. [0, 2]
    '''
    total_gain = None
    if criterion == "entropy":
        total_gain = compute_entropy(y)
    elif criterion == "gini":
        total_gain = 0
    else:
        raise NotImplementedError
    
    if features is None:
        features = range(X.shape[1])
    
    gains = {}
    for i in features:
        X_i = X[:, i]
        n = len(X_i)
        unique_x = np.unique(X_i)
        gain_i = total_gain
        
        for x_label in unique_x:
            y_sub = y[X_i == x_label]
            
            if criterion == "entropy":
                # Gain_split = Gain_all - sum(|y_i|/|Y| * gain_i)
                gain_i_sub = compute_entropy(y_sub)
                gain_i -= len(y_sub) / n * gain_i_sub
            elif criterion == "gini":
                # Gini_split = sum(|y_i|/|Y| * gini_i)
                gain_i_sub = compute_gini(y_sub)
                gain_i += len(y_sub) / n * gain_i_sub
            
        
        gains[i] = gain_i
    
    return gains

In [107]:
'''
1. compute information entropy
2. compute information gain
3. select best feature
4. recurrently construct decision tree
'''


class Node:
    '''
    feature: Rainy? Child: {Yes: No playing; No: Node(Sunny? ...)}
    '''
    def __init__(self, feature=None, label=None, child=None):
        self.feature = feature  # -1 iff leaf node
        self.child = child if child is not None else {}  # empty iff leaf node
        self.label = label  # "Yes" or "No" iff leaf node; None if not leaf node
    
    # recurrently print tree
    def __repr__(self, indent="", depth=0, feature_names=None, mapping_dict_list=None):
        if not self.child:
            label = mapping_dict_list[self.feature][self.label]
            return indent + f"_{label}_"
        else:
            feature = feature_names[self.feature]
            result = indent + depth*4 * " " + f"{feature}?\n"
            for v, child in self.child.items():
                value = mapping_dict_list[self.feature][v]
                result += (depth+1)*4 * " " + f"{value}: "
                result += child.__repr__(indent, depth+1, feature_names, mapping_dict_list) + "\n"
            result = result.strip("\n")
            return result.strip()


# https://zhuanlan.zhihu.com/p/197476119
class ID3:
    def __init__(self, criterion='entropy'):
        if criterion not in ["entropy", "gini"]:
            raise ValueError(f'criterion must be one of ["entropy", "gini"]')
        self.criterion = criterion
        
        self.feature_names = None
        self.mapping_dict_list = None
        self.root = None
    
    def make_tree(self, df: pd.DataFrame, max_depth=None, min_samples_leaf=1, alpha=0.5):
        '''
        default: y is at the last column of 'df'. fit() uses this. 
        '''
        df = df.copy()  # to prevent astype() alter original table
        self.feature_names = df.columns.tolist()
        
        mapping_dict_list = []
        inv_mapping_list = []
        # turn categories labels into int [0, 1, 2, ...]
        for c in df.columns:
            mapping_dict = df[c].astype('category').cat.categories.tolist()
            d = dict(enumerate(mapping_dict))
            mapping_dict_list.append(d)
            
            inv_d = {v: k for (k, v) in d.items()}
            inv_mapping_list.append(inv_d)
            
            # turn string labels into int
            df[c] = df[c].astype('category').cat.codes

        X = df.iloc[:, :-1].values
        y = df.iloc[:, -1].values
        
        self.mapping_dict_list = mapping_dict_list
        # predict() uses this
        self.inv_mapping_list = inv_mapping_list
        
        self.root = self.fit(X, y)
        
        self.pessimistic_error_pruning(
            X, y, max_depth, min_samples_leaf, alpha
            )
    
    # recurrently called
    def fit(self, X, y, features=[]):
        '''
        e.g. make_tree(X_sub, y_sub, [0, 2])
        As for now, columns of X are not reduced when doing recursion. Select() only selects rows. 
        '''
        ys = np.unique(y)
        if len(ys) == 1:
            # default: y is at the last column of 'df'
            return Node(feature=-1, label=ys[0])  # leaf node
        
        if features == []:
            features = list(range(X.shape[1]))
        
        # compute score for each subset
        gains = split_gain(X, y, features, self.criterion)  # e.g. {0: 1.1, 2: 0.5}
        
        # choose the current split feature
        if self.criterion == "entropy":
            best_feature = max(gains.items(), key=lambda x: x[1])[0]  # e.g. (0, 1.1)[0] = 0
        elif self.criterion == "gini":
            best_feature = min(gains.items(), key=lambda x: x[1])[0]
        
        node = Node(feature=best_feature)
        subsets = select_data(X, y, best_feature)
        
        for feature, (X_sub, y_sub) in subsets.items():
            # feature is an integer
            # leaf node
            new_features = features.copy()
            new_features.remove(best_feature)
            assert new_features is not None, f"'new_features' is None"
            node.child[feature] = self.fit(X_sub, y_sub, new_features)
        
        return node
    
    # naive version
    def pessimistic_error_pruning(
        self, X, y, max_depth=None, min_samples_leaf=1, alpha=0.5
        ):
        '''
        Estimate each node with train set (to judge whether to prune or not). 
        
        Parameters:
        - X: Training features.
        - y: Training labels.
        - max_depth: Maximum depth of the tree (optional).
        - min_samples_leaf: Minimum number of samples required to be at a leaf node (optional).
        - alpha: Pessimistic error adjustment factor (optional).
        
        https://www.bilibili.com/read/cv11072586/
        https://www.cnblogs.com/zxz666/p/15364751.html
        https://zhuanlan.zhihu.com/p/368912628
        
        When e' <= e + se(e), prune. Pruning can reduce the upper bound of error rate at the node. 
        When e' >  e + se(e), do not prune. That will make things worse. 
        
        e = original misclassified sample number
        se = sqrt(np(1-p)), where p = e/n. Binomial distribution. 
        e' = positive / negative sample number
        '''
        def calculate_errors(node, X, y):
            if node.label is not None:
                # leaf node: e/n
                return len(y != node.label)
            else:
                errors = 0
                # TODO!: can we reduce time?
                for value, child in node.child.items():
                    mask = X[:, node.feature] == value
                    errors = calculate_errors(child, X[mask], y[mask])
            return errors
        
        # TODO: store num_leaf for each node
        # TODO: use update_num_leaf() after pruning
        def count_leaf_nodes(node):
            if node.label is not None:  # leaf node
                return 1
            else:
                return sum(count_leaf_nodes(child) for child in node.child.values())
        
        def compute_pessimistic_error(node, X, y, alpha):
            e = calculate_errors(node, X, y)
            num_leaf = count_leaf_nodes(node)
            return e + alpha * num_leaf
        
        def prune_node(node, X, y, alpha=0.5, depth=0):
            '''
            Recurrently called to prune nodes.
            (X, y): subset of dataset
            alpha: penalty coefficient, default=0.5
            
            steps:
            1. check whether to prune or not
                leaf node? node too deep? samples too few? reduces error rate?
            2. do pruning
            '''
            def prune(node: Node):
                node.feature = -1
                node.child = {}
                node.label = np.bincount(y).argmax()
            
            # 1. check whether to prune or not
            # leaf node
            if node.label is not None:  
                return
            
            # node is too deep
            if max_depth is not None and depth > max_depth:
                prune(node)
                return
            
            # samples are too few
            if X.shape[1] < min_samples_leaf:
                prune(node)
                return
            
            # e' <= e + se(e)
            n = len(y)
            e = compute_pessimistic_error(node, X, y, alpha)
            # se = sqrt(np(1-p)) = sqrt(e(n-e)/n)
            se = np.sqrt(e*(n-e)/n)
            
            # pruned node: majority voting
            node_pruned = Node(label=np.bincount(y).argmax())
            e_pruned = compute_pessimistic_error(node_pruned, X, y, alpha)
            
            if e_pruned <= e + se:
                prune(node)
            else:
                for value, child in node.child.items():
                    mask = X[:, node.feature] == value
                    prune_node(child, X[mask], y[mask], alpha, depth+1)
        
        # -----------------------------------------------------------------------
        prune_node(self.root, X, y, alpha, depth=1)

    def predict(self, x_str):
        if isinstance(x_str[0], str) or not hasattr(x_str[0], "__iter__"):
            n = len(x_str)
            assert n == len(self.feature_names) - 1, f"{n} != {len(self.feature_names) - 1}"
            
            X = [self.inv_mapping_list[i][x_str[i]] for i in range(n)]  # str -> int
            current = self.root
            while current.child:
                assert current.label is None  # test if I wrote codes right
                
                feature = current.feature
                current = current.child[X[feature]]
            
            return self.mapping_dict_list[current.feature][current.label]
        
        else:
            results = []
            for X in x_str:
                results.append(self.predict(X))
        
        return results
    
    def __repr__(self) -> str:
        return self.root.__repr__(feature_names=self.feature_names, 
                                mapping_dict_list=self.mapping_dict_list)



criterion = 'entropy'
tree = ID3(criterion=criterion)
tree.make_tree(df)
print('Original tree:\n', tree, '\n')

tree.make_tree(df, max_depth=1, min_samples_leaf=1)
print('Pruned tree:\n', tree, '\n')

gains = split_gain(X, y, criterion=criterion)
print(f"split gains: {gains}, \n")

tree.predict(['Sunny', 'Normal', 'Weak'])

Original tree:
 Outlook?
    Cloudy: _Yes_
    Rainy: Wind?
        Strong: _No_
        Weak: _Yes_
    Sunny: Humidity?
        High: _No_
        Normal: _Yes_ 

Pruned tree:
 Outlook?
    Cloudy: _Yes_
    Rainy: _Yes_
    Sunny: _No_ 

split gains: {0: 0.24674981977443933, 1: 0.15183550136234164, 2: 0.04812703040826949}, 



'No'

In [108]:
tree = ID3(criterion='gini')
tree.make_tree(df)
print(tree)
tree.predict(['Sunny', 'Normal', 'Weak'])

Outlook?
    Cloudy: _Yes_
    Rainy: Wind?
        Strong: _No_
        Weak: _Yes_
    Sunny: Humidity?
        High: _No_
        Normal: _Yes_


'Yes'

In [110]:
compute_entropy(0.4)

0.9709505944546686

In [115]:
np.log2(0.6)*0.6

-0.44217935649972373