In [1]:
import numpy as np
import pandas as pd
import itertools
import operator
import pprint
import tqdm.auto as tqdm

In [2]:
def flatten(container):
    for i in container:
        if isinstance(i, (list, tuple)):
            yield from flatten(i)
        else:
            yield i

In [3]:
def calculate_prob(y, prob_class):
    n_class = sum(prob_class == i for i in flatten(y))

    return n_class

In [4]:
def format_input(X):
    if not isinstance(X, (pd.DataFrame, pd.Series)):
        if isinstance(X, dict):
            X = pd.DataFrame(X)
        else:
            X = pd.DataFrame(X,
                             columns=['x' + str(i) for i in range(X.shape[1])])

    return X

In [5]:
def get_class_counts(y):
    classes = np.unique(y)
    counts = []
    class_counts = {}

    for c in classes:
        counts.append(int(sum(c == i for i in flatten(y.values))))
        class_counts[c] = int(sum(c == i for i in flatten(y.values)))

    return class_counts

In [6]:
def get_datatypes(X):
    datatypes = {}
    for name, dtype in X.dtypes.items():
        if dtype == 'O':
            datatypes[name] = 'categorical'
        else: 
            datatypes[name] = 'numerical'
            
    return datatypes

In [7]:
def calculate_entropy(y):
    class_counts = get_class_counts(y)
    class_entropy = {
        cl: -count / sum(class_counts.values()) *
        np.log2(count / sum(class_counts.values()))
        for cl, count in class_counts.items()
    }
    entropy = sum(class_entropy.values())

    return entropy

In [8]:
def get_split_condition(X, col):
    return X.iloc[:, col].unique()

In [9]:
def get_subset(X, col, val):
    eq_X = X.loc[X[X.columns[col]] == val]
    not_eq_X = X.drop(eq_X.index)

    return eq_X, not_eq_X

In [10]:
def is_pure(y):
    return len(np.unique(y.values)) == 1

In [11]:
def build_tree(X,
               y,
               max_depth=None,
               min_samples=None,
               tree=None):

    if tree is None:
        tree = {}

    entropies = {}
    columns = {}

    for col_idx in tqdm.trange(X.shape[1], position=0, leave=True):

        split_vals = get_split_condition(X, col_idx)

        for val in split_vals:
            eq_X, not_eq_X = get_subset(X, col_idx, val)
            eq_y, not_eq_y = y.loc[eq_X.index, :], y.loc[not_eq_X.index, :]

            eq_entr = calculate_entropy(eq_y)
            not_eq_entr = calculate_entropy(not_eq_y)

            avg_entr = (len(eq_y) / len(y) * eq_entr) + (len(not_eq_y) /
                                                         len(y) * not_eq_entr)

            entropies[val] = avg_entr
            columns[val] = col_idx

    best_split_key = min(entropies, key=entropies.get)
    best_split_value = entropies.get(best_split_key)

    eq_X, not_eq_X = get_subset(X, columns[best_split_key], best_split_key)
    eq_y, not_eq_y = y.loc[eq_X.index, :], y.loc[not_eq_X.index, :]

    print('Best split', best_split_key, best_split_value)

    column_name = X.columns[columns[best_split_key]]

    condition = column_name + ' == ' + best_split_key

    tree[condition] = []
    if is_pure(eq_y):
        tree[condition].append(np.unique(eq_y)[0])
    else:
        tree[condition].append(build_tree(eq_X, eq_y))

    if is_pure(not_eq_y):
        tree[condition].append(np.unique(not_eq_y)[0])
    else:
        tree[condition].append(build_tree(not_eq_X, not_eq_y))

    return tree

In [12]:
def get_prediction(instance, tree):

    node = list(tree.keys())[0]

    operators = {'==': operator.eq, '<': operator.le}

    col, op, val = node.split(' ')

    compare_eq = operators[op](instance[col].values[0], val)

    if compare_eq:
        ans = tree[node][0]
    else:
        ans = tree[node][1]

    if not isinstance(ans, dict):
        return ans
    else:
        return get_prediction(instance, ans)

In [13]:
def predict(X, tree):

    predictions = []

    for i in range(len(X)):
        predictions.append(get_prediction(X.iloc[[i], :], tree))

    return predictions

In [14]:
def score(X, y, tree):
    correct = 0

    for i in range(len(X)):
        pred = predict(X.iloc[[i]], tree)
        if pred == y.values[i]:
            correct += 1

    return correct / len(X) * 100

In [15]:
X = {'Taste':['Salty','Spicy','Spicy','Spicy','Spicy','Sweet','Salty','Sweet','Spicy','Salty'],
       'Temperature':['Hot','Hot','Hot','Cold','Hot','Cold','Cold','Hot','Cold','Hot'],
       'Texture':['Soft','Soft','Hard','Hard','Hard','Soft','Soft','Soft','Soft','Hard']}
       
y = {'Eat':['No','No','Yes','No','Yes','Yes','No','Yes','Yes','Yes']}

X = format_input(X)
y = format_input(y)

In [16]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123)

In [17]:
tree = build_tree(X_train, y_train)


Best split Sweet 0.75



Best split Cold 0.9182958340544896



Best split Spicy 0.6666666666666666



Best split Soft 0.0



Best split Soft 0.0


In [18]:
pprint.pprint(tree, width=50)

{'Taste == Sweet': ['Yes',
                    {'Temperature == Cold': [{'Taste == Spicy': [{'Texture == Soft': ['Yes',
                                                                                      'No']},
                                                                 'No']},
                                             {'Texture == Soft': ['No',
                                                                  'Yes']}]}]}


In [19]:
print('Train Accuracy:', round(score(X_train, y_train, tree), 2))
print('Test Accuracy:', round(score(X_test, y_test, tree), 2))

Train Accuracy: 100.0
Test Accuracy: 100.0
