In [1]:
from sklearn.datasets import load_wine

In [3]:
dataset = load_wine()

In [7]:
X = dataset["data"]
y = dataset["target"]

In [10]:
dataset["target_names"]

array(['class_0', 'class_1', 'class_2'], dtype='<U7')

There is not much information to what the classes represent, but this is not much of a problem, since this dataset is just to test the classifier.

In [224]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)

Let's try running PCA to see how the classes are distributed.

In [225]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

pca_pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("pca", PCA(n_components=3))
])

In [226]:
X_pca = pca_pipeline.fit_transform(X_train)

In [227]:
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

In [228]:
plt.ioff()
fig = plt.figure(figsize=(8, 5))
ax  = plt.axes(projection = "3d")

def anim(frame):
    global ax
    ax.view_init(30, frame)
    
ax.scatter3D(*X_pca[y_train==0, :].T, label="Class 0")
ax.scatter3D(*X_pca[y_train==1, :].T, label="Class 1")
ax.scatter3D(*X_pca[y_train==2, :].T, label="Class 2")
plt.legend()
anim = FuncAnimation(fig, anim, frames = 360, interval=60)
plt.close()
HTML(anim.to_html5_video())

As can be seen, even after PCA, most of the datapoints are distinct in their clusters. This is a key insight, since a decision tree might beniefit strongly from dataset of this type and the PCA itself.

In [394]:
from sklearn.base import ClassifierMixin, BaseEstimator
import numpy as np
from collections import deque

class DecisionNode:
    def __init__(self, c, value, samples, depth, score = None, attrib = None, threshold = None):
        self.c = c
        self.depth = depth
        self.value = value
        self.samples = samples
        self.score = score
        self.attrib = attrib
        self.threshold = threshold
        self.left_child = None
        self.right_child = None
        
    def __hash__(self):
        return self.depth*hash(self.c)*self.samples

class DecisionTree(BaseEstimator, ClassifierMixin):
    
    def gini(self, node):
        return 1 - sum(((val/node.samples)**2 for val in node.value))

    def entropy(self, node):
        return -sum((p*np.log2(p) for p in (val/node.samples for val in node.value if val != 0)))
    
    def split_score(self, left_classes, right_classes):
        m_left = len(left_classes)
        m_right = len(right_classes)
        m = m_left + m_right
        _, left_counts = np.unique([x[2] for x in left_classes], return_counts=True)
        _, right_counts = np.unique([x[2] for x in right_classes], return_counts=True)
        
        if self.criterion == "gini":
            left_score = 1 - sum((lc/m_left)**2 for lc in left_counts)
            right_score = 1 - sum((rc/m_right)**2 for rc in right_counts)
            return m_left/m*left_score + m_right/m*right_score
        elif self.criterion == "entropy":
            left_score = -sum((p*np.log2(p) for p in (lc/m_left for lc in left_counts if lc != 0)))
            right_score = -sum((p*np.log2(p) for p in (rc/m_right for rc in right_counts if rc != 0)))
            return m_left/m*left_score + m_right/m*right_score
    
    def build_root(self, X, y):
        y_values, y_counts = np.unique(y, return_counts=True)
        y_most_frequent = y_values[np.argmax(y_counts)]
        samples = len(y)
        value = y_counts
        
        root = DecisionNode(y_most_frequent, value, samples, depth=1)
    
        return y_values, root
    
    def __init__(self, criterion = "gini", max_depth = None, min_samples_split = 2):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
    
    def predict(self, X):
        result = np.zeros((X.shape[0],))
        for idx, row in enumerate(X):
            node = self.root
            while node.attrib:
                if row[node.attrib] <= node.threshold:
                    node = node.left_child
                else:
                    node = node.right_child
            result[idx] = node.c
        return result
    
    def fit(self, X, y):
        #create the root node
        self.classes, self.root = self.build_root(X, y)
        node_queue = deque()
        node_queue.append(self.root)
        
        self.depth = self.root.depth
        
        node_indices = {}
        
        node_indices[self.root] = np.arange(0, X.shape[0])
        
        self.nattribs = X.shape[1]
        
        while node_queue:
            node = node_queue.pop()
            
            #calculate node score
            if self.criterion == "gini":
                node.score = self.gini(node)
            elif self.criterion == "entropy":
                node.score = self.entropy(node)
            else:
                raise ValueError("Invalid criterion specified")
                        
            if abs(node.score) <= 1e-10:
                continue
                
            
            if node.samples < self.min_samples_split:
                continue
            if self.max_depth and node.depth >= self.max_depth:
                continue
                
            best_split_attrib = None
            best_split_value = None
            best_split_score = node.score
            left_split_indices = []
            right_split_indices = []
            
            splitted_node_indices = node_indices[node]
                
            #try to split the node
            for attrib_index in range(self.nattribs):
                splitted_node_attribs = X[splitted_node_indices,attrib_index]
                splitted_node_targets = y[splitted_node_indices]
                splitted_node_sorted = sorted(
                    zip(splitted_node_attribs, splitted_node_indices, splitted_node_targets))                
                
                bucket_split_index = 1
                #everything with index < bucket_split_index goes to the left split
                while bucket_split_index < len(splitted_node_sorted)-1:
                    split_score = self.split_score(splitted_node_sorted[:bucket_split_index],
                                                  splitted_node_sorted[bucket_split_index:])
                    if split_score < best_split_score:
                        best_split_score = split_score
                        best_split_attrib = attrib_index
                        best_split_value = np.mean((splitted_node_sorted[bucket_split_index-1][0],
                                                    splitted_node_sorted[bucket_split_index][0]))
                        left_split_indices = [x[1] for x in splitted_node_sorted[:bucket_split_index]]
                        right_split_indices = [x[1] for x in splitted_node_sorted[bucket_split_index:]]
                    bucket_split_index += 1
                    
            if best_split_attrib:
                #calculate most frequent class for left and right splits
                left_dummy_targets = np.concatenate([y[left_split_indices], self.classes])
                right_dummy_targets = np.concatenate([y[right_split_indices], self.classes])
                left_targets, left_counts = np.unique(left_dummy_targets, return_counts=True)
                left_counts = [x-1 for x in left_counts]
                right_targets, right_counts = np.unique(right_dummy_targets, return_counts=True)
                right_counts = [x-1 for x in right_counts]
                                
                node.attrib = best_split_attrib
                node.threshold = best_split_value
                                
                left_node = DecisionNode(c = left_targets[np.argmax(left_counts)],
                                        value = left_counts,
                                        samples = len(left_dummy_targets)-len(self.classes),
                                        depth = node.depth+1)
                node.left_child = left_node
                node_indices[left_node] = left_split_indices
                
                right_node = DecisionNode(c = right_targets[np.argmax(right_counts)],
                                         value = right_counts,
                                         samples = len(right_dummy_targets) - len(self.classes),
                                         depth = node.depth+1)
                node.right_child = right_node
                node_indices[right_node] = right_split_indices
                
                node_queue.append(left_node)
                node_queue.append(right_node)
                
                if left_node.depth > self.depth:
                    self.depth = left_node.depth
                                     
        return self
        
        

In [395]:
dt = DecisionTree(criterion="gini")
dt.fit(X_train, y_train)
dt.depth

4

In [396]:
from sklearn.metrics import accuracy_score

In [397]:
y_pred = dt.predict(X_test)

In [398]:
accuracy_score(y_pred, y_test)

0.9555555555555556

In [399]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

pca_pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("pca", PCA(n_components=13))
])

In [400]:
X_train_pca = pca_pipeline.fit_transform(X_train)

In [401]:
X_test_pca = pca_pipeline.transform(X_test)

In [402]:
dt.fit(X_train_pca, y_train)

DecisionTree()

In [403]:
y_pca_pred =  dt.predict(X_test_pca)

In [404]:
accuracy_score(y_pca_pred, y_test)

0.7111111111111111

The result is very dependant on the split. Let's try to evaluate it using cross validation.

In [424]:
from sklearn.model_selection import cross_val_score

In [425]:
np.mean(cross_val_score(dt, X, y, scoring="accuracy", cv=20))

0.8944444444444445

In [426]:
from sklearn.tree import DecisionTreeClassifier

sklearn_dtc = DecisionTreeClassifier()

In [427]:
np.mean(cross_val_score(sklearn_dtc, X, y, scoring="accuracy", cv=20))

0.8944444444444445