In [1]:
import pandas as pd
import math
import numpy as np
from scipy import stats
import time

### Load data

In [2]:
data = pd.read_csv('abalone.data', header=None)

In [3]:
data = data.to_numpy()

In [4]:
N = len(data)
print(N)

4177


### Calculate impurity in data set with given impurity measure

In [5]:
def find_set_impurity(classes, impurity_measure):
    prob_distribution = get_prob_distr(classes)
    impurity = 0
    
    if impurity_measure == 'entropy':
        for x in prob_distribution:
            impurity -= x*math.log(x, 2)
            
    elif impurity_measure == 'gini':
        for x in prob_distribution:
            impurity += x*(1-x)
            
    return impurity

In [6]:
# get the probability distribution of the unique values in given array
def get_prob_distr(classes):
    cl, prob = np.unique(classes, return_counts=True)
    return prob/len(classes)

In [7]:
def get_info_gain(arr, split_point, col, entropy_before, discrete, impurity_measure):    
    if discrete:
        split_arr = split(arr, arr[:,col] == split_point)
    else: 
        split_arr = split(arr, arr[:,col] < split_point)
        
    left, right = split_arr[0], split_arr[1]
        
    entropy_after = find_set_impurity(left[:,-1], impurity_measure)*(len(left)/len(arr))
    entropy_after += find_set_impurity(right[:,-1], impurity_measure)*(len(right)/len(arr))
    
    return entropy_before - entropy_after

In [8]:
def split(arr, cond):
    return [arr[cond], arr[~cond]]

In [9]:
def find_split_feature(arr, max_split_checks, impurity_measure):
    best_IG = 0
    best_feature = -1
    split_point = None
    entropy_before = find_set_impurity(arr[:,-1], impurity_measure)
    
    for feature in range(arr.shape[1]-1): # last column is the label column, all others are features
        feature_IG, ig, best_feature_split = 0, 0, None
        unique = np.unique(arr[:,feature])
        
        if (type(arr[0,feature]) == str):
            # try splitting into 1 | all others, for every type
            for u in unique:
                ig = get_info_gain(arr, u, feature, entropy_before, True, impurity_measure)
                if ig > feature_IG: 
                    feature_IG = ig
                    best_feature_split = u
            
        else:
            if len(unique) <= max_split_checks+1: # try split between every unique value (ascending)
                for i in range(len(unique)-1):
                    split_point = (unique[i] + unique[i+1])/2
                    ig = get_info_gain(arr, split_point, feature, entropy_before, False, impurity_measure)
                    if ig > feature_IG: 
                        feature_IG = ig
                        best_feature_split = split_point
                
            else: # try max_split_checks splits uniformly between min and max of feature
                step = (np.max(arr[:,feature])-np.min(arr[:,feature]))/max_split_checks
                for i in range(1, max_split_checks):
                    split_point = np.min(arr[:,feature])+i*step
                    ig = get_info_gain(arr, split_point, feature, entropy_before, False, impurity_measure)
                    if ig > feature_IG: 
                        feature_IG = ig
                        best_feature_split = split_point
                        
        if feature_IG > best_IG: 
            best_IG = feature_IG
            best_feature = feature
            best_split_point = best_feature_split
    
    if best_feature == -1: raise Exception('Could not find a feature to split')
    return best_feature, best_split_point
            

In [10]:
class decision_tree():
    def __init__(self):
        self.root = None
        self.n_nodes = 1
        
    def learn(self, X, y, impurity_measure='entropy', max_split_checks=35, prune=False, prune_split=0):
        self.impurity_measure = impurity_measure
        self.max_split_checks = max_split_checks
        
        pr = None;
        if prune: # split data into training and pruning sets
            lim = math.ceil(len(X)*(1-prune_split))
            arr = np.column_stack((X[:lim,:],y[:lim]))
            pr = np.column_stack((X[lim:,:],y[lim:]))
        else:
            arr = np.column_stack((X,y))
        
        self.root = decision_tree.node()
        self.build(arr, self.root)
        
        if prune: self.prune(pr)
        
    def build(self, arr, current_node):
        if len(arr) < 1: raise Exception('Recursive call to form node with empty dataframe')
        current_node.data = arr # store for use in pruning
        if len(np.unique(arr[:,-1])) == 1:
            # all data points have the same label, return a leaf with that label
            current_node.label = arr[0,-1]
        else:
            identical_features = True
            for feature in range(arr.shape[1]-1):
                if len(np.unique(arr[:,feature])) > 1:
                    identical_features = False
                    break
                    
            if identical_features:
                # all remaining data points have identical features, set the most common label
                current_node.label = stats.mode(X[:,-1])[0][0]
                
            else: # find a feature and value to split the data set
                feature, split_val = find_split_feature(arr, self.max_split_checks, self.impurity_measure)
                if type(arr[0,feature]) == str:
                    current_node.continuous = False
                    split_arr = split(arr, arr[:,feature] == split_val)
                else: 
                    current_node.continuous = True
                    split_arr = split(arr, arr[:,feature] < split_val)
                
                current_node.left = decision_tree.node()
                current_node.right = decision_tree.node()
                current_node.split = split_val
                current_node.feature = feature
                self.n_nodes += 2
                self.build(split_arr[0], current_node.left)
                self.build(split_arr[1], current_node.right)
                    
    def predict(self, x):
        current_node = self.root
        while current_node.label is None:
            if current_node.continuous:
                if x[current_node.feature] < current_node.split:
                    current_node = current_node.left
                else:
                    current_node = current_node.right
            else:
                if x[current_node.feature] == current_node.split:
                    current_node = current_node.left
                else:
                    current_node = current_node.right
        return current_node.label
    
    def prune(self, pr):
        queue = []
        queue.append(self.root)
        acc = accuracy(self, pr[:,:-1], pr[:,-1]) # accuracy before pruning
        
        while queue:
            n = queue.pop() # current node
            left, right = n.left, n.right # subtree
            n.label = stats.mode(n.data[:,-1])[0][0] # most common label in subtree
            # when a label is set at this node, the predict method will stop here and ignore the subtree
            new_acc = accuracy(self, pr[:,:-1], pr[:,-1]) 
            
            if new_acc >= acc:
                n.left, n.right = None, None
            else:
                n.label = None
                queue.append(n.left)
                queue.append(n.right)
    
    def get_n_nodes(self):
        return self.n_nodes
    
    class node():
        def __init__(self):
            self.left = None
            self.right = None
            self.continuous = None
            self.label = None
            self.split = None
            self.feature = None
            self.data = None

In [11]:
class id3_model():
    def __init__(self):
        self.tree = None
        
    def learn(self, X, y, impurity_measure='entropy', prune=False):
        self.tree = decision_tree()
        self.tree.learn(X, y, impurity_measure=impurity_measure, prune=prune, prune_split=0.1)
        
    def predict(self, X):
        if self.tree is None: raise Exception('Attempting to predict with untrained model.')
        else: return self.tree.predict(X)
        
    def get_n_nodes(self):
        return self.tree.get_n_nodes()

In [16]:
def accuracy(model, X_test, y_test):
    accuracy = 0
    for i in range(len(X_test)):
        if model.predict(X_test[i]) == y_test[i]:
            accuracy += 1
        
    return (accuracy/len(X_test))

### Split data set into training and test 

In [12]:
len(data)*0.9

3759.3

In [13]:
X = data[:3760,:-1]
y = data[:3760,-1]
X_test = data[3760:,:-1]
y_test = data[3760:,-1]

### Learn and test the model with entropy as impurity measure

In [17]:
model = id3_model()
start = time.time()
model.learn(X, y)
end = time.time()
print(accuracy(model, X_test, y_test))
print("Learning time: % 5.1fs" %(end-start))

0.23261390887290168
Learning time:   7.0s


### Learn and test the model with Gini as impurity measure

In [18]:
model = id3_model()
model.learn(X, y, impurity_measure='gini')
print(accuracy(model, X_test, y_test))

0.2158273381294964


### Prune the tree

In [19]:
model.get_n_nodes()

4035

In [20]:
model = id3_model()
model.learn(X, y, prune=True)
model.get_n_nodes()

3679

In [21]:
from sklearn.tree import DecisionTreeClassifier
from sklearn import preprocessing

### Compare to sklearn's DecisionTreeClassifier
I had to use a label encoder, as the DecisionTreeClassifier did not accept mixed data types as input

In [22]:
le = preprocessing.LabelEncoder()
le.fit(data[:,0])
tr_data = le.transform(data[:,0])
tr_data = np.column_stack((tr_data,data[:,1:]))
X = tr_data[:3760,:-1]
y = tr_data[:3760,-1]
y = y.astype('int')
X_test = tr_data[3760:,:-1]
y_test = tr_data[3760:,-1]
y_test = y_test.astype('int')

In [23]:
clf = DecisionTreeClassifier()
start = time.time()
clf.fit(X,y)
end = time.time()
print(clf.score(X_test, y_test))
print("Learning time: % 5.1fs" %(end-start))

0.21342925659472423
Learning time:   0.0s
