In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn import datasets
from collections import Counter

In [2]:
iris = datasets.load_iris()
X = iris.data
Y = iris.target
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size = 0.33, random_state = 42)

Gini Impurity

In [3]:
def gini(x): 
    counter = Counter(x)
    total = len(x)
    G = 0
    for value in counter.values():
        prob = value/total
        G += prob * (1 - prob)
    return G

def weighted_gini(a, b):
    return (len(a) * gini(a) + len(b) * gini(b))/(len(a) + len(b))

In [4]:
# Used to find the optimal split within a continuous measurement

def split_cont_measure(data, target):
    data = np.array(data)
    
    # sorting the data to find the mid values
    idx = np.argsort(data)
    sorted_data = data[idx]
    
    past_x = sorted_data[0]
    middle_values = [] 
    for x in sorted_data: 
        if x != past_x:
            average_x = 0.5 * (x + past_x)
            middle_values.append(average_x)
            past_x = x
    
    # Finding the gini for each split
    gini_values = np.zeros(len(middle_values))    
    
    for i, mid_val in enumerate(middle_values):
        smaller_idx = data < mid_val
        larger_idx = np.invert(smaller_idx)
        
        g = weighted_gini(target[smaller_idx], target[larger_idx])
        gini_values[i] = g 
    
    # Choosing the best split based on gini value
    best_split = middle_values[np.argmin(gini_values)]
    return best_split, np.min(gini_values)
    

In [5]:
# Used to find the optimal split within a discrete measurement
def split_disc_measures(data, target):
    data = np.array(data)
    
    distinct_data = np.unique(data)
    gini_values = np.zeros(len(distinct_data))
    
    for i, x in enumerate(distinct_data):
        match = data == x
        other = data != x
        
        match_t = target[match]
        other_t = target[other]
    
        gini_values[i] = weighted_gini(match_t, other_t) 
        
    best = np.argmin(gini_values)
    return distinct_data[best], gini_values[best]

In [6]:
def find_probability(targets):
    counts = Counter(targets).most_common()
    total = len(targets)
    results = []
    for key, val in counts:
        results.append([key, val/total])
    return results

In [7]:
class Node:
    
    def __init__(self, data, target, min_node_size = 5):
        self.data = data
        self.target = target
        self.node_gini = gini(target)
        self.node_type = "node"
        self.min_node_size = min_node_size
        self.branch_left = None
        self.branch_right = None 
        self.measurement_idx = None 
        self.split_value = None 
        self.split_method = None
    
    
                
    def expand_branch(self):
        
        if self.min_node_size >= len(self.data):
            self.node_type = "leaf"
        
        else:
            split_values = np.zeros(len(self.data.T))
            gini_values = np.zeros(len(self.data.T))
            methods = []

            for i, measure in enumerate(self.data.T):    
                # If measurement is discrete
                if type(measure[0]) == np.str_:
                    split_values[i], gini_values[i] = split_disc_measures(measure, self.target)
                    methods.append("=")

                # If measurement is continuous
                else:
                    split_values[i], gini_values[i] = split_cont_measure(measure, self.target)
                    methods.append("<")

            best = self.measurement_idx = np.argmin(gini_values)
            self.split_value = split_values[best]
            self.split_method = methods[best]

            # If the split does not improve gini impurity -> make leaf node
            if gini_values[best] >= self.node_gini:
                self.node_type = "leaf"

            # Else split into two new nodes and repeat function 
            else: 
                if self.split_method == "=":
                    left_idx = self.data.T[best] == split_values[best]
                    right_idx = self.data.T[best] != split_values[best]
                else: 
                    left_idx = self.data.T[best] < split_values[best]
                    right_idx = self.data.T[best] >= split_values[best]


                self.branch_left = Node(self.data[left_idx], self.target[left_idx])
                self.branch_right = Node(self.data[right_idx], self.target[right_idx])

                self.branch_left.expand_branch()
                self.branch_right.expand_branch()

    
    
    
    def predict(self, query, style = "classification"):
        if self.node_type == "leaf":
            counter = Counter(self.target)

            if style == "regression": 
                return find_probability(self.target)

            else:
                most_common = find_probability(self.target)[0][0]
                return most_common
            
        # If not end of branch
        else: 
            query_val = query[self.measurement_idx]
            
            if self.split_method == "=":
                if query_val == self.split_value:
                    return left_branch.predict(query, style = style)
                else: 
                    return right_branch.predict(query, style = style)
                
            else: 
                if query_val < self.split_value:
                    return self.branch_left.predict(query, style = style)
                else: 
                    return self.branch_right.predict(query, style = style)
    

In [8]:
dt = Node(x_train, y_train)
dt.expand_branch()

In [9]:
predictions = np.zeros(len(x_test))
for i, x in enumerate(x_test):
    predictions[i] = dt.predict(x)
    

In [10]:
np.average(predictions == y_test)

0.98