In [30]:
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from copy import deepcopy
from statistics import mode, mean

from sklearn.metrics import confusion_matrix, mean_squared_error
from sklearn.model_selection import train_test_split

In [31]:
debug = 1

# Decision Tree Implementation

In [32]:
class Node:
    """
    A Decision Tree Node

    Parameters
    ----------
    feature - the value of a descriptive/target feature of a node
    data - the partitioned dataset resulting from the parent node on a feature value
    branch - the feature value from the parent node
    parent - the immediate adjacent node along the path from the root
    leaf - denotes a terminal node whose prediction is based on the path from the root to the node
    depth - the number of levels from the root to a node
    children - the nodes resulting from each unique feature value of the parent
    """
    def __init__(
        self,
        *,
        feature=None,
        data=None,
        branch=None,
        parent=None,
        leaf=False,
        depth=0,
        children=[]
    ):
        self.feature = feature
        self.data = data
        self.branch = branch
        self.parent = parent
        self.leaf = leaf
        self.depth = depth
        self.children = children

    def __str__(self):
        return self.depth * '\t' + f"{self.feature} (Branch: {self.branch})"

    @property
    def is_leaf(self):
        """Returns whether a node is terminal"""
        return self.leaf

    @property
    def X(self):
        """Returns the partitioned feature matrix of a node"""
        return self.data.iloc[:, :-1]

    @property
    def y(self):
        """Returns the partitioned target vector of a node"""
        return self.data.iloc[:, -1]

    @property
    def left(self):
        """Returns the left child"""
        return self.children[0]

    @property
    def right(self):
        """Returns the right child"""
        return self.children[1]

In [33]:
class DecisionTreeEstimator:
    """A Decision Tree Estimator"""
    def __init__(self, criterion={}):
        """
        Parameters
        ----------
        root: the starting node of the decision tree
        n_levels: contains a list of all unique feature values for each descriptive feature
        criterion (pre-pruning): {max_depth, partition_threshold, low_gain}
        """
        self.root = None
        self.n_levels = None
        self.criterion = criterion

    def __repr__(self, node=None):
        """Displays the decision tree (Pre-Order Traversal)"""
        if not node:
            node = self.root
        return str(node) + ''.join(['\n' + self.__repr__(child) for child in node.children])

    def find_optimal_threshold(self, df, d):
        thresholds = []
        for i in range(len(df)-1):
            pairs = df.iloc[i:i+2, -1]
            if any(pairs.iloc[0] != val for val in pairs.values):
                thresholds.append(df.loc[pairs.index, d].mean())

        levels = []
        for mkr in thresholds:
            X_a, X_b = df.loc[df[d] < mkr], df.loc[df[d] >= mkr]

            weight_a = len(X_a.loc[X_a[d] < mkr]) / len(df)
            weight_b = len(X_b.loc[X_b[d] >= mkr]) / len(df)

            entropy = self.metric(df.iloc[:, :-1], df.iloc[:, -1])

            entropy_left = self.metric(X_a.iloc[:, :-1], X_a.iloc[:, -1])
            entropy_right = self.metric(X_b.iloc[:, :-1], X_b.iloc[:, -1])

            rem = entropy_left * weight_a + entropy_right * weight_b

            levels.append(entropy - rem)

        return max(levels), thresholds[np.argmax(levels)], rem

    def partition(self, X, y, d, t):
        """Returns a subset of the training data with feature (d) with level (t)"""
        D = pd.concat([X.loc[X[d]==t], y.loc[X[d]==t]], axis=1)
        D = D.drop([d], axis=1)
        return D.iloc[:, :-1], D.iloc[:, -1], t

    def fit(self, X, y):
        self.n_levels = {d: X[d].unique() for d in X.columns}
        self.root = self.make_tree(X, y)
        return self

    def predict(self, x):
        node = self.root
        while not node.is_leaf:
            query_branch = x.get(node.feature).values
            if is_numeric_dtype(query_branch):
                node = node.left if node.left.branch < query_branch else node.right
            else:
                for child in node.children:
                    if child.branch == query_branch:
                        node = child
                        break
                else:
                    raise ValueError(f"Branch {node.feature} -> {x.get(node.feature).values} does not exist")
        return node

    def score(self, X, y):
        return [self.predict(X.iloc[x].to_frame().T).feature for x in range(len(X))]

## Decision Tree Classifier

###  Gini Index
$Gini(t, \mathcal{D})=1-\sum_{l\in levels(t)}P(t=l)^2$

### Entropy *(Bits)*
$\mathcal{H}(t, \mathcal{D})=-\sum_{l\in levels(t)\\}^{}{P(t=l)\cdot\log_2(P(t=l))}$

### Rem
$rem(d,\mathcal{D})=\sum_{l\in levels(t)}{}\frac{|\mathcal{D}_{d=l}|}{\mathcal{D}}\cdot \mathcal{H}(t, \mathcal{D}_{d=l})$

### Information Gain
$IG(d, \mathcal{D})=\mathcal{H}(t, \mathcal{D})-rem(d, \mathcal{D})$

### Information Gain Ratio
$GR(d, \mathcal{D})=\frac{IG(d, \mathcal{D})}{\mathcal{H}(d, \mathcal{D})}$

In [34]:
class DecisionTreeClassifier(DecisionTreeEstimator):
    """A Rudimentary Decision Tree Classifier"""
    def __init__(self, *, metric="entropy", eval="info_gain", criterion={}):
        """
        Metric: {gain, gini}
        Eval: {info_gain, gain_ratio}
        """
        super().__init__(criterion)
        self.metric = self.entropy if metric == "entropy" else self.gini
        self.eval = self.information_gain if eval == "info_gain" else self.information_gain_ratio

        self.optimal_threshold = {}

    def gini(self, X, y):
        proba = lambda t: len(X.loc[y==t]) / len(X)
        return 1 - np.sum([proba(t)**2 for t in y.unique()])

    def entropy(self, X, y):
        """Measures the amount of uncertainty/impurity/heterogeneity in (X, y)"""
        proba = lambda t: len(X.loc[y==t]) / len(X)
        return -np.sum([proba(t) * np.log2(proba(t)) for t in y.unique()])

    def rem(self, X, y, d):
        """Measures the entropy after partitioning (X, y) on feature (d)"""
        weight = lambda t: len(X.loc[X[d]==t]) / len(X)
        return np.sum([weight(t) * self.metric(X.loc[X[d]==t], y.loc[X[d]==t]) for t in X[d].unique()])

    def information_gain(self, X, y, d):
        """Measures the reduction in the overall entropy in (X, y) achieved by testing on feature (d)"""
        if is_numeric_dtype(X[d]):
            df = pd.concat([X, y], axis=1)
            df.sort_values(by=[d], inplace=True)
            gain, optimal_threshold, rem = self.find_optimal_threshold(df, d)

            self.optimal_threshold[d] = optimal_threshold

            if debug:
                print(f"{d} = {self.metric(X, y):.3f} - {rem:.3f} = {gain:.3f}") 
            return gain

        if debug:
            print(f"{d} = {self.metric(X, y):.3f} - {self.rem(X, y, d):.3f} = {self.metric(X, y) - self.rem(X, y, d):.3f}") 

        return self.metric(X, y) - self.rem(X, y, d)

    def information_gain_ratio(self, X, y, d):
        proba = lambda t: len(X.loc[X[d]==t]) / len(X)
        entropy = lambda: -np.sum([proba(t) * np.log2(proba(t)) for t in X[d].unique()])

        if debug:
            print(f"{d} = ({self.metric(X, y):.3f} - {self.rem(X, y, d):.3f}) / {entropy()} = {(self.metric(X, y) - self.rem(X, y, d)) / entropy()}")
        return self.metric(X, y) - self.rem(X, y, d) / entropy()

    def make_tree(self, X, y, *, parent=None, branch=None, depth=0):
        """Performs the ID3 algorithm

        Base Cases
        ----------
        - all instances have the same target feature values
        - dataset is empty, return a leaf node labeled with the majority class of the parent
        - if all feature values are identical
        - max_depth reached
        - max number of instances in partitioned dataset reached
        """
        make_node = lambda f, t: Node(feature=f, data=pd.concat([X, y], axis=1), branch=branch, parent=parent, depth=depth, leaf=t)

        if len(y.unique()) == 1:  # all instances have the same target feature values
            if debug:
                print("All instances have the same target feature value\n")
            return make_node(y.iat[0], True)
        elif X.empty:  # dataset is empty, return a leaf node labeled with the majority class of the parent
            if debug:
                print("Dataset is empty\n")
            return make_node(mode(parent.y), True)
        elif all((X[d] == X[d].iloc[0]).all() for d in X.columns):  # if all feature values are identical
            if debug:
                print("All instances have the same descriptive features\n")
            return make_node(mode(y), True)
        if self.criterion.get("max_depth", float('inf')) <= depth:  # max depth reached
            if debug:
                print("Stopping at Max Depth\n")
            return make_node(mode(y), True)
        if self.criterion.get("partition_threshold", float('-inf')) >= len(X):  # max number of instances in partitioned dataset reached
            if debug:
                print(f"Stopping at {len(X)} instances\n")
            return make_node(mode(y), True)

        if debug:
            print("===Information Gain===")

        max_gain = np.argmax([self.eval(X, y, d) for d in X.columns])

        if self.criterion.get('low_gain', float('-inf')) >= max_gain:
            if debug:
                print(f"Stopping at Gain={max_gain}\n")
            return make_node(mode(y), True)

        best_feature = X.columns[max_gain]
        best_node = deepcopy(make_node(best_feature, False))

        if debug:
            print()
            print("===Best Feature===")
            print(best_feature)
            print()

        if is_numeric_dtype(X[best_feature]):
            eps = self.optimal_threshold[best_feature]
            left = [X.loc[X[best_feature] < eps], y.loc[X[best_feature] < eps], eps]
            right = [X.loc[X[best_feature] >= eps], y.loc[X[best_feature] >= eps], eps]
            X_levels = [left, right]
        else:
            X_levels = [self.partition(X, y, best_feature, level) for level in self.n_levels[best_feature]]

        for *d, level in X_levels:
            if debug:
                print(f"===Partitioned Dataset ({level})===")
                print(pd.concat(d, axis=1).head())
                print()
            best_node.children.append(self.make_tree(*d, parent=best_node, branch=level, depth=depth+1))
        return best_node

    def score(self, X, y):
        y_hat = super().score(X, y)
        return confusion_matrix(y, y_hat, labels=y.unique())

In [35]:
# Testing with Continuous Feature Values

df = pd.DataFrame({
    'Stream': ["false", "true", "true", "false", "false", "true", "true"],
    'Slope': ["steep", "moderate", "steep", "steep", "flat", "steep", "steep"],
    'Elevation': [3900, 300, 1500, 1200, 4450, 5000, 3000],
    'Vegetation': ["chapparal", "riparian", "riparian", "chapparal", "conifer", "conifer", "chapparal"],
})

X, y = df.iloc[:, :-1], df.iloc[:, -1]

dt = DecisionTreeClassifier(metric="entropy")
dt.fit(X, y)

===Information Gain===
Stream = 1.557 - 1.251 = 0.306
Slope = 1.557 - 0.979 = 0.577
Elevation = 1.557 - 0.694 = 0.863

===Best Feature===
Elevation

===Partitioned Dataset (4175.0)===
  Stream     Slope  Elevation Vegetation
0  false     steep       3900  chapparal
1   true  moderate        300   riparian
2   true     steep       1500   riparian
3  false     steep       1200  chapparal
6   true     steep       3000  chapparal

===Information Gain===
Stream = 0.971 - 0.551 = 0.420
Slope = 0.971 - 0.649 = 0.322
Elevation = 0.971 - 0.551 = 0.420

===Best Feature===
Stream

===Partitioned Dataset (false)===
   Slope  Elevation Vegetation
0  steep       3900  chapparal
3  steep       1200  chapparal

All instances have the same target feature value

===Partitioned Dataset (true)===
      Slope  Elevation Vegetation
1  moderate        300   riparian
2     steep       1500   riparian
6     steep       3000  chapparal

===Information Gain===
Slope = 0.918 - 0.667 = 0.252
Elevation = 0.918 - -0

Elevation (Branch: None)
	Stream (Branch: 4175.0)
		chapparal (Branch: false)
		Elevation (Branch: true)
			riparian (Branch: 2250.0)
			chapparal (Branch: 2250.0)
	conifer (Branch: 4175.0)

In [36]:
dt

Elevation (Branch: None)
	Stream (Branch: 4175.0)
		chapparal (Branch: false)
		Elevation (Branch: true)
			riparian (Branch: 2250.0)
			chapparal (Branch: 2250.0)
	conifer (Branch: 4175.0)

## Decision Tree Regressor

$var(t, \mathcal{D})=\frac{\sum_{i=1}^n(t_i-\bar{t})^2}{n-1}$

$weighted\ var(t, \mathcal{D}) = \sum_{l\in levels(d)}{} \frac{|\mathcal{D}_{d=l}|}{|\mathcal{D}|} \times var(t, \mathcal{D}_{d=l})$

In [37]:
class DecisionTreeRegressor(DecisionTreeEstimator):
    """A Rudimentary Decision Tree Regressor"""
    def __init__(self, *, criterion={}):
        super().__init__(criterion)
        self.metric = self.variance

    def variance(self, X, y):
        if len(X) == 1:
            return 0
        return np.sum([(t-mean(y))**2 for t in y]) / (len(X)-1)

    def weighted_variance(self, X, y, d):
        if is_numeric_dtype(X[d]):
            df = pd.concat([X, y], axis=1)
            df.sort_values(by=[d], inplace=True)
            gain, optimal_threshold, rem = self.find_optimal_threshold(df, d)

            self.optimal_threshold[d] = optimal_threshold

            if debug:
                print(f"{d} = {self.metric(X, y):.3f} - {rem:.3f} = {gain:.3f}") 
            return gain

        weight = lambda t: len(X.loc[X[d]==t]) / len(X)
        return np.sum([weight(t) * self.metric(X.loc[X[d]==t], y.loc[X[d]==t]) for t in X[d].unique()])

    def make_tree(self, X, y, *, parent=None, branch=None, depth=0):
        """
        Performs the ID3 algorithm

        Base Cases
        ----------
        - all instances have the same target feature values
        - dataset is empty, return a leaf node labeled with the majority class
        - max_depth reached
        - max number of instances in partitioned dataset reached
        """
        make_node = lambda f, t: Node(feature=f, data=pd.concat([X, y], axis=1), branch=branch, parent=parent, depth=depth, leaf=t)

        if len(y.unique()) == 1:
            return make_node(y.iat[0], True)
        elif X.empty:
            return make_node(mean(y), True)
        if self.criterion.get("max_depth", float('inf')) <= depth:
            return make_node(mean(y), True)
        if self.criterion.get("partition_threshold", float('-inf')) >= len(X):
            return make_node(mean(y), True)

        min_var = np.argmin([self.weighted_variance(X, y, d) for d in X.columns])

        best_feature = X.columns[min_var]
        best_node = deepcopy(make_node(best_feature, False))

        if is_numeric_dtype(X[best_feature]):
            eps = self.optimal_threshold[best_feature]
            left = [X.loc[X[best_feature] < eps], y.loc[X[best_feature] < eps], eps]
            right = [X.loc[X[best_feature] >= eps], y.loc[X[best_feature] >= eps], eps]
            X_levels = [left, right]
        else:
            X_levels = [self.partition(X, y, best_feature, level) for level in self.n_levels[best_feature]]

        for *d, level in X_levels:
            best_node.children.append(self.make_tree(*d, parent=best_node, branch=level, depth=depth+1))
        return best_node

    def score(self, X, y):
        y_hat = super().score(X, y)
        return mean_squared_error(y, y_hat, squared=False)

In [38]:
# Testing Regression

data = {
    'Season': ['winter', 'winter', 'winter', 'spring', 'spring', 'spring', 'summer', 'summer', 'summer', 'autumn', 'autumn', 'autumn'],
    'Work Day': ['false', 'false', 'true', 'false', 'true', 'true', 'false', 'true', 'true', 'false', 'false', 'true'],
    'Rentals': [800, 826, 900, 2100, 4740, 4900, 3000, 5800, 6200, 2910, 2880, 2820],
}

df = pd.DataFrame(data)
A, b = df.iloc[:, :-1], df.iloc[:, -1]

dt_regr = DecisionTreeRegressor().fit(A, b)
dt_regr

Season (Branch: None)
	Work Day (Branch: winter)
		813 (Branch: false)
		900 (Branch: true)
	Work Day (Branch: spring)
		2100 (Branch: false)
		4820 (Branch: true)
	Work Day (Branch: summer)
		3000 (Branch: false)
		6000 (Branch: true)
	Work Day (Branch: autumn)
		2895 (Branch: false)
		2820 (Branch: true)

In [39]:
import pickle
from os.path import exists
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing(as_frame=True)

X_train, X_test, y_train, y_test = train_test_split(
    housing.data, housing.target, test_size=0.2, random_state=42
)

In [40]:
%%time

model = r'../models/housing.sav'

if exists(model):
    dt = pickle.load(open(model, 'rb'))
else:
    dt = DecisionTreeRegressor(criterion={"partition_threshold": len(X_train) * 5e-2})
    dt.fit(X_train, y_train)
    pickle.dump(dt, open(model, 'wb'))

dt.score(X_test, y_test)

# Random Forest Implementation

In [None]:
class RandomForest:
    def __init__(self, n_estimators=5, n_sample=0, eval="info_gain", criterion={}):
        self.n_estimators = n_estimators
        self.n_sample = n_sample
        self.forest = [DecisionTreeClassifier(eval=eval, criterion=criterion) for _ in range(n_estimators)]

    def sub_sample(self, X, n_sample=2):
        """Enforces feature randomness"""
        return np.random.choice(X.columns.to_numpy(), n_sample, replace=False)

    def bootstrap_sample(self, X, y, n_sample, key=True):
        feature_subset = self.sub_sample(X, int(np.log2(len(X))))
        d = pd.concat([X, y], axis=1)
        d = d.sample(n=n_sample, replace=key)
        return d.iloc[:, :-1][feature_subset], d.iloc[:, -1]

    def fit(self, X, y):
        for i, tree in enumerate(self.forest, start=1):
            tree.fit(*self.bootstrap_sample(X, y, self.n_sample))
            if debug:
                print(f"Decision Tree #{i} Complete")
        return self

    def predict(self, x):
        assert all(isinstance(model, DecisionTreeClassifier) for model in self.forest)
        return mode([dt.predict(x).feature for dt in self.forest])

    def score(self, X, y):
        y_hat = [self.predict(X.iloc[x].to_frame().T) for x in range(len(X))]
        return confusion_matrix(y, y_hat, labels=y.unique())

In [None]:
%%time
rf = RandomForest(n_estimators=5, n_sample=len(X_train), criterion={'max_depth': 4})
rf.fit(X_train, y_train)
rf.score(X_test, y_test)

===Information Gain===
pixel_7_4 = 3.316 - 3.227 = 0.283
pixel_2_5 = 3.316 - 3.199 = 0.459
pixel_4_0 = 3.316 - 3.316 = 0.000
pixel_4_4 = 3.316 - 3.156 = 0.420
pixel_3_0 = 3.316 - 3.314 = 0.002
pixel_4_7 = 3.316 - 3.316 = 0.000
pixel_3_4 = 3.316 - 3.167 = 0.429
pixel_7_1 = 3.316 - 3.310 = 0.133
pixel_6_6 = 3.316 - 3.294 = 0.394
pixel_7_0 = 3.316 - 3.314 = 0.002

===Best Feature===
pixel_2_5

===Partitioned Dataset (1.25)===
      pixel_7_4  pixel_2_5  pixel_4_0  pixel_4_4  pixel_3_0  pixel_4_7  \
1329       15.0        0.0        0.0       12.0        0.0        0.0   
598        15.0        0.0        0.0       13.0        0.0        0.0   
1190        9.0        0.0        0.0       16.0        0.0        0.0   
679         0.0        0.0        0.0       16.0        0.0        0.0   
711        15.0        0.0        0.0       16.0        0.0        0.0   

      pixel_3_4  pixel_7_1  pixel_6_6  pixel_7_0  target  
1329       11.0        0.0        0.0        0.0       1  
598       

array([[ 0,  0,  9, 13,  0,  1,  0,  0,  0, 17],
       [ 0,  1,  1,  0,  8,  0,  0, 10,  0, 13],
       [ 4, 19,  0,  0,  0, 17,  0,  0,  0,  1],
       [ 6,  3,  3,  0, 17,  0,  0, 10,  0,  0],
       [ 0, 21,  2,  2,  0,  0,  0,  0,  0,  5],
       [ 2, 13,  1,  0,  0,  0,  0,  5,  0,  8],
       [ 7,  3,  0,  0, 20,  1,  0,  3,  0,  4],
       [ 0,  1,  9,  8,  1,  0,  0,  1,  0, 22],
       [ 1, 15,  1,  0,  8,  0,  0,  6,  0,  2],
       [ 2,  9,  1,  0,  1,  9,  0, 13,  0,  0]])