In [1]:
import numpy as np
import math

In [2]:
# step 1: get data

iris_data = np.loadtxt('iris.txt')
data = iris_data[:, :4]
labels = iris_data[:, 4]

split_rate = 0.8
x_train = np.zeros((1, 4))
x_test = np.zeros((1, 4))
y_train = np.zeros((1))
y_test = np.zeros((1))

for index in range(0, 150, 50):
    split_index = index + int(50 * 0.8)
    x_train = np.concatenate((x_train, data[index:split_index]), axis=0)
    x_test = np.concatenate((x_test, data[split_index: index + 50]), axis=0)

    y_train = np.concatenate((y_train, labels[index:split_index]), axis=0)
    y_test = np.concatenate((y_test, labels[split_index: index + 50]), axis=0)

x_train = x_train[1:]
x_test = x_test[1:]
y_train = y_train[1:]
y_test = y_test[1:]

In [3]:
# some utility

def count_class(labels):
    class_counts = {}
    for label_name in np.unique(labels):
        class_counts[label_name] = 0

    for label in labels:
        class_counts[label] += 1
    
    return class_counts

def calc_entropy(labels):
    class_counts = count_class(labels)
    average_entropy = 0
    for _class, count in class_counts.items():
        weight_index = count / len(labels)
        average_entropy += weight_index * math.log(weight_index, 2)
    
    return -1 * average_entropy

def information_gain(true_labels, false_labels, current_entropy):
    weight_index = float(len(true_labels)) / (len(true_labels) + len(false_labels))
    test = current_entropy - weight_index * calc_entropy(true_labels) - (1 - weight_index) * calc_entropy(false_labels)
    return test

def unique_values(data):
    return np.array(list(set([value for value in data[:]])))

def is_numeric(value):
    return isinstance(value, int) or isinstance(value, float)

def find_largest_count_of_label(class_counts):
    max_count = 0
    label = None

    for key, value in class_counts.items():
        if class_counts[key] > max_count:
            max_count = class_counts[key]
            label = key

    return label

In [4]:
# step 2: find the best question (split method)

class Question():
    def __init__(self, feature_number, value):
        self.feature_number = feature_number
        self.value = value

    def compare(self, data):
        data = data[self.feature_number]
        return data >= self.value if is_numeric(data) else data == self.value

    def __repr__(self):
        condition = ">=" if is_numeric(self.value) else "=="
        return "Question: feature_{} {} {} ?".format(self.feature_number, condition, self.value)

def split_data(data, labels, question):
    true_data = []
    true_labels = []
    false_data = []
    false_labels = []
    for index in range(len(data)):
        if question.compare(data[index]):
            true_data.append(data[index])
            true_labels.append(labels[index])
        else:
            false_data.append(data[index])
            false_labels.append(labels[index])
    
    return np.array(true_data), np.array(true_labels), np.array(false_data), np.array(false_labels)

def find_best_split_method(data, labels):
    best_gain = 0
    best_question = None
    current_entropy = calc_entropy(labels)
    feature_indices = data.shape[1]

    for feature_index in range(feature_indices):
        for value in data[:, feature_index]:
            question = Question(feature_index, value)
            _, true_labels, _, false_labels = split_data(data, labels, question)

            if len(true_labels) == 0 or len(false_labels) == 0:
                continue

            gain = information_gain(true_labels, false_labels, current_entropy)
            if gain >= best_gain:
                best_gain, best_question = gain, question

    return best_gain, best_question

In [5]:
# step 3: setup the decision tree

class DecisionNode():
    def __init__(self, question, true_branch, false_branch, depth, data, labels):
        self.question = question
        self.true_branch = true_branch
        self.false_branch = false_branch
        self.depth = depth
        self.data = data
        self.labels = labels

class LeafNode():
    def __init__(self, labels):
        self.predicted_label = find_largest_count_of_label(count_class(labels))

class DecisionTree():
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def build_tree(self):
        def build_node(data, labels, depth=0):
            gain, question = find_best_split_method(data, labels)
            if gain == 0:
                return LeafNode(labels)

            depth += 1

            true_data, true_labels, false_data, false_labels = split_data(data, labels, question)
            true_branch = build_node(true_data, true_labels, depth)
            false_branch = build_node(false_data, false_labels, depth)
            
            return DecisionNode(question, true_branch, false_branch, depth, data, labels)
        
        self.tree = build_node(self.data, self.labels)

    def print_tree(self):
        def print_tree_node(node, spacing=""):
            if isinstance(node, LeafNode):
                print("{} ↘ Predicted label: {}".format(spacing, node.predicted_label))
                return
            
            if node.depth % 3 == 0:
                list_sign = "■"
            elif node.depth % 3 == 1:
                list_sign = "◆"
            else:
                list_sign = "●"

            print("{} <{}>".format(spacing, node.question))
            print("{} {} True:".format(spacing, list_sign))
            print_tree_node(node.true_branch, spacing + "  ")

            print("{} {} False:".format(spacing, list_sign))
            print_tree_node(node.false_branch, spacing + "  ")
        
        print_tree_node(self.tree)
    
    def classify(self, data, node):
        if isinstance(node, LeafNode):
            return node.predicted_label

        if node.question.compare(data):
            return self.classify(data, node.true_branch)
        else:
            return self.classify(data, node.false_branch)

    def predict(self, data):
        answers = []
        for index in range(len(data)):
            answers.append(self.classify(data[index], self.tree))
        
        return np.array(answers)
    
    def evaluate(self, data, labels):
        answers = self.predict(data)

        count = 0
        for index in range(len(answers)):
            if answers[index] == labels[index]:
                count += 1
        
        print("Accuracy is: {}%".format(round(count * 100 / len(answers)), 2))

In [6]:
# step 4: build decision tree 
DecisionTree = DecisionTree(x_train, y_train)
DecisionTree.build_tree()
DecisionTree.print_tree()

<Question: feature_3 >= 1.0 ?>
 ◆ True:
   <Question: feature_3 >= 1.8 ?>
   ● True:
     <Question: feature_2 >= 4.9 ?>
     ■ True:
       ↘ Predicted label: 3.0
     ■ False:
       <Question: feature_1 >= 3.2 ?>
       ◆ True:
         ↘ Predicted label: 2.0
       ◆ False:
         ↘ Predicted label: 3.0
   ● False:
     <Question: feature_2 >= 5.0 ?>
     ■ True:
       <Question: feature_3 >= 1.6 ?>
       ◆ True:
         <Question: feature_2 >= 5.8 ?>
         ● True:
           ↘ Predicted label: 3.0
         ● False:
           ↘ Predicted label: 2.0
       ◆ False:
         ↘ Predicted label: 3.0
     ■ False:
       <Question: feature_3 >= 1.7 ?>
       ◆ True:
         ↘ Predicted label: 3.0
       ◆ False:
         ↘ Predicted label: 2.0
 ◆ False:
   ↘ Predicted label: 1.0


In [7]:
# evaluate and inference by decision tree

DecisionTree.evaluate(x_test, y_test)

print("-" * 20)

DecisionTree.predict(x_test)

Accuracy is: 100%
--------------------


array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 2., 2., 2., 2., 2., 2., 2.,
       2., 2., 2., 3., 3., 3., 3., 3., 3., 3., 3., 3., 3.])