In [355]:
import numpy as np
import math
import sklearn.datasets

In [356]:


class DecisionTreeBase:
    def __init__(self, depth=0, max_depth=15, min_samples=1, max_features=None):
        self.l_child = None
        self.r_child = None
        self.max_depth = max_depth
        self.min_samples = min_samples
        self.depth = depth
        self.max_features = max_features
        self.split_col = None
        self.split_val = None
        self.is_leaf = False
        self.y_vals = []
        
    def find_best_split(self, data, target):
        min_loss = math.inf
        best_col = None
        best_split = None

        cols = range(len(data[0]))
        if self.max_features:
            cols = np.random.choice((cols), size=self.max_features, replace=False)

        for col_idx in cols: 

            for row_idx in range(len(data) - 1):
                lesser_half = target[data[:, col_idx] <= data[row_idx][col_idx]]
                greater_half = target[data[:, col_idx] > data[row_idx][col_idx]]
                current_loss = self.loss(lesser_half, greater_half)
                if current_loss < min_loss:
                    best_col = col_idx
                    best_split = data[row_idx][col_idx]
                    min_loss = current_loss 
                if min_loss == 0:
                    break
            if min_loss == 0:
                break
                
        return best_col, best_split 
    
    def fit(self, X, y):
        split_col, split_val = self.find_best_split(X, y)
        self.split_col = split_col
        self.split_val = split_val

        if not split_val:
            self.is_leaf = True
            self.y_vals = y
            return

        lesser_criteria = X[:, split_col] <= split_val
        greater_criteria = X[:, split_col] > split_val

        if len(y[lesser_criteria]) < self.min_samples or len(y[greater_criteria]) < self.min_samples:
            self.is_leaf = True
            self.y_vals = y
            
        else:
            self.l_child = type(self)(self.depth + 1)
            self.l_child.fit(X[lesser_criteria], y[lesser_criteria])
            self.r_child = type(self)(self.depth + 1)
            self.r_child.fit(X[greater_criteria], y[greater_criteria])
            
    def predict(self, X):
        if self.is_leaf:
            return self.leaf_predict(self.y_vals)
        if X[self.split_col] <= self.split_val:
            return self.l_child.predict(X)
        else:
            return self.r_child.predict(X)

    # to be implemented by child classes
    def loss(self, less_half, greater_half):
        return None

    def leaf_predict(self, y):
        return None
    

class DecisionTreeClassifier(DecisionTreeBase):
    
    def loss(self, less, greater):
        def get_impurity(vals):
            # get count of each unique value in the dataset
            _, counts = np.unique(vals, return_counts=True)

            # gini impurity formula
            impurity = 1 - np.sum((counts / len(vals))**2)
            return impurity

        return (get_impurity(less) + get_impurity(greater)) / 2

    def leaf_predict(self, y):
        return np.bincount(y).argmax()


class DecisionTreeRegressor(DecisionTreeBase):

    def loss(self, less, greater):
        def sum_sq_error(vals):
            return np.sum((vals - np.mean(vals))**2)
        
        return sum_sq_error(less) + sum_sq_error(greater)

    def leaf_predict(self, y):
        return np.mean(y)



# Testing Classification Tree

In [357]:
# note: all iris data is numeric

iris_df = sklearn.datasets.load_iris()
data = iris_df["data"]
target = iris_df["target"]

tree = DecisionTreeClassifier()
tree.fit(data, target)
a = 10
print(tree.predict(iris_df["data"][a]))
print(iris_df["target"][a])

0
0


In [358]:
correct = 0
for i in range(len(iris_df["data"])):
    pred = tree.predict(iris_df["data"][i])
    true = iris_df["target"][i]
    if pred == true:
        correct += 1        
print(f"{correct / len(iris_df['data']) * 100}% accurate")

100.0% accurate


# Testing Regression Tree

In [359]:
def rmse(predicted, target):
    return np.sqrt(np.sum(np.power(predicted - target, 2)))

In [360]:
diabetes_df = sklearn.datasets.load_diabetes()
data = diabetes_df["data"]
target = diabetes_df["target"]
tree = DecisionTreeRegressor()
tree.fit(data, target)
pred = [tree.predict(diabetes_df["data"][i]) for i in range(len(target))]
rmse(pred, diabetes_df["target"])


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


0.0

In [361]:
from sklearn.tree import DecisionTreeRegressor
regressor = DecisionTreeRegressor()

# Train the regressor
regressor.fit(data, target)

y_pred = regressor.predict(data)
rmse(y_pred, target)

0.0

# Random Forest

In [362]:

def bootstrap(X):
    bootstrap_indices = np.random.randint(len(X), size=len(X))
    oob_indices = set(range(len(X))).difference(set(bootstrap_indices))
    return bootstrap_indices, np.array(oob_indices)


class RandomForestClassifier:

    def __init__(self, n_estimators=20, max_features=True):
        self.n_estimators = n_estimators
        self.max_features = int(math.sqrt(max_features))
        self.trees = []

    def fit(self, X, y):
        for _ in range(self.n_estimators):
            bootstrap_indices, oob_indices = bootstrap(X)
            tree = DecisionTreeClassifier(max_features=self.max_features)
            tree.fit(X[bootstrap_indices], y[bootstrap_indices])
            self.trees.append(tree)

    def predict(self, X):
        return np.argmax(np.bincount([tree.predict(X) for tree in self.trees]))

iris_df = sklearn.datasets.load_iris()
data = iris_df["data"]
target = iris_df["target"]
rf = RandomForestClassifier()
rf.fit(data, target)
rf.predict(data[50])

1

In [377]:

class RandomForestRegressor:

    def __init__(self, n_estimators=20, max_features=False):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.trees = []

    def fit(self, X, y):
        for _ in range(self.n_estimators):
            bootstrap_indices, oob_indices = bootstrap(X)
            tree = DecisionTreeRegressor()
            tree.fit(X[bootstrap_indices], y[bootstrap_indices])
            self.trees.append(tree)

    def predict(self, X):
        predictions= []
        for sample in X:
            predictions.append(np.mean([tree.predict([sample]) for tree in self.trees]))
        return np.array(predictions) 


diabetes_df = sklearn.datasets.load_diabetes()
data = diabetes_df["data"]
target = diabetes_df["target"]
rf = RandomForestRegressor()
rf.fit(data, target)
y_pred = rf.predict(data)

rmse(y_pred, target)


473.8720238629835

In [382]:
from sklearn.ensemble import RandomForestRegressor
regressor = RandomForestRegressor()

diabetes_df = sklearn.datasets.load_diabetes()
data = diabetes_df["data"]
target = diabetes_df["target"]

# Train the regressor
regressor.fit(data, target)

y_pred = regressor.predict(data)
rmse(y_pred, target)

457.3278052994373