In [11]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import matplotlib.pyplot as plt

In [2]:

#function to find best split by Gini impurity 

def find_best_split(feature_vector, target_vector):
    x = np.unique(feature_vector)
    averages = (x[1:] + x[:-1])/2
    gini_vec = [] 
    splits = []
    if averages.shape[0] == 0:
        return None,None,x, -1    
    for it in np.arange(0,averages.shape[0]):
        nr_l = (averages[it] < feature_vector).mean()
        nr_r = (averages[it] > feature_vector).mean()
        targets_l = target_vector[np.where(averages[it] < feature_vector)]
        targets_r = target_vector[np.where(averages[it] > feature_vector)]
        l_p1 = targets_l.mean()
        l_p0 = 1 - l_p1
        r_p1 = targets_r.mean()
        r_p0 = 1 - r_p1
        gini_vec.append(-nr_r*(1 - r_p0**2 - r_p1**2) - nr_l*(1- l_p0**2 - l_p1**2))
        splits.append((targets_l.shape[0]/feature_vector.shape[0]) * (targets_r.shape[0]/feature_vector.shape[0]))
    splits = np.array(splits)
    gini_vec = np.array(gini_vec)
    best_gini = gini_vec[np.where(splits == np.min(splits[np.where(gini_vec == np.max(gini_vec))]))][0]
    best_average = averages[np.where(splits == np.min(splits[np.where(gini_vec == np.max(gini_vec))]))][0]
    return averages, gini_vec, best_average, best_gini

In [3]:
class DecisionTree:
    def __init__(self, feature_types, max_depth=None, min_samples_split=1, min_samples_leaf=1):
        if np.any(list(map(lambda x: x != "real" and x != "categorical", feature_types))):
            raise ValueError("There is unknown feature type")

        self._tree = {}
        self._feature_types = feature_types
        self._max_depth = max_depth
        self._min_samples_split = min_samples_split
        self._min_samples_leaf = min_samples_leaf
        self._depth = None

    def _fit_node(self, sub_X, sub_y, node):
        
        sub_X = np.array(sub_X)
        sub_y = np.array(sub_y)
        if np.all(sub_y == sub_y[0]):
            node["type"] = "terminal"
            node["class"] = sub_y[0]
            return
        elif sub_X.shape[0] < self._min_samples_leaf:
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return
        elif  self._max_depth != None and self._depth > self._max_depth:
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return
        self._depth +=1
        feature_best, threshold_best, gini_best, split = None, None, None, None
        for feature in range(1, sub_X.shape[1]):
            feature_type = self._feature_types[feature]
            categories_map = {}

            if feature_type == "real":
                feature_vector = sub_X[:, feature]
            elif feature_type == "categorical":
                counts = Counter(sub_X[:, feature])
                clicks = Counter(sub_X[sub_y == 1, feature])
                ratio = {}
                for key, current_count in counts.items():
                    if key in clicks:
                        current_click = clicks[key]
                    else:
                        current_click = 0
                    ratio[key] = current_click / current_count
                sorted_categories = list(map(lambda x: x[1], sorted(ratio.items(), key=lambda x: x[1])))
                categories_map = dict(zip(list(np.unique(sub_X[:, feature])),sorted_categories))
                feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
                
            else:
                raise ValueError

            if len(feature_vector) < 3: 
                continue
            _, _, threshold, gini = find_best_split(feature_vector, sub_y)
            if gini_best is None or gini > gini_best:
                feature_best = feature
                gini_best = gini
                split = feature_vector <= threshold

                if feature_type == "real":
                    threshold_best = threshold
                elif feature_type == "categorical":
                    threshold_best = list(map(lambda x: x[0],
                                              filter(lambda x: x[1] < threshold, categories_map.items())))
                else:
                    raise ValueError

        if feature_best is None:
            node["type"] = "terminal"
            node["class"] = sub_y[0]
            return
        
        if sub_y[split].shape[0] < self._min_samples_split or sub_y[np.logical_not(split)].shape[0] < self._min_samples_split:
            node["type"] = "terminal"
            node["class"] = sub_y[0]
            return            
        
        node["type"] = "nonterminal"
        node["class"] = Counter(sub_y).most_common(1)
        node["feature_split"] = feature_best
        if self._feature_types[feature_best] == "real":
            node["threshold"] = threshold_best
        elif self._feature_types[feature_best] == "categorical":
            node["categories_split"] = threshold_best
        else:
            raise ValueError
            
        
        node["left_child"], node["right_child"] = {}, {}
        self._fit_node(sub_X[split], sub_y[split], node["left_child"])
        self._fit_node(sub_X[np.logical_not(split)], sub_y[np.logical_not(split)], node["right_child"])
        

    def _predict_node(self, x, node):
        if node['type'] == 'terminal' :
            return node['class']
        if self._feature_types[node['feature_split']] == 'real':
            if x[node['feature_split']]> node['threshold']:
                return self._predict_node(x, node['right_child'])
            else:
                return self._predict_node(x, node['left_child'])
        else:
            if np.any(x[node['feature_split']] == node['categories_split']):
                return self._predict_node(x, node['left_child'])
                
            else:
                return self._predict_node(x, node['right_child'])
            pass

    def fit(self, X, y):
        self._depth = 0
        self._fit_node(X, y, self._tree)
        return self
    
    def predict(self, X):
        predicted = []
        X = np.array(X)
        for index in np.arange(0,X.shape[0]):
            predicted.append(self._predict_node(X[index,:], self._tree))
        return np.array(predicted)
    def score(self,X, y):
        return accuracy_score(self.predict(X),y)

In [9]:
#load dataset for test
data = pd.read_csv('data.csv',header = None)
data.head()

Unnamed: 0,0,1,2,3,4,5,6
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [21]:
# make target: 1 - good,vgood; 0 - acc,unacc
data[6][data[6] == 'acc'] = 0
data[6][data[6] == 'unacc'] = 0
data[6][data[6] == 'good'] = 1
data[6][data[6] == 'vgood'] = 1
target = data.pop(6)

for feature in data.columns:
    data[feature] = LabelEncoder().fit_transform(data[feature].astype(str))
data.head()
X_train,X_test,y_train,y_test = train_test_split(data, target, test_size = 0.3)
DecTree = DecisionTree(feature_types= np.full(data.shape[1], 'categorical')).fit(X_train, y_train)
print('Accuracy: %f' %accuracy_score(DecTree.predict(X_test).astype('int64'), y_test.astype('int64')))

Accuracy: 0.878613
