In [1]:
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from utils.evaluation import eval_model

In [15]:
def entropy(y):
    _, counts = np.unique(y, return_counts=True)
    p = counts / counts.sum()
    return -np.sum(p * np.log2(p + 1e-12))   # add epsilon for numerical safety

def split_dataset(X, y, feature, threshold):
    left_mask  = X[:, feature] <= threshold
    right_mask = X[:, feature] > threshold

    return X[left_mask], y[left_mask], X[right_mask], y[right_mask]

class DecisionTree:
    def __init__(self, max_depth = 5, min_samples_split = 2, feature_subsample_size=None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.feature_subsample_size = feature_subsample_size
        self.root = None

    class Node:
        def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
            self.feature = feature
            self.threshold = threshold
            self.left = left
            self.right = right
            self.value = value  # used for leaf nodes

    def __most_common_label(self, y):
        values, counts = np.unique(y, return_counts=True)
        return values[np.argmax(counts)]

    def __best_split(self, X, y):
        best_feature, best_threshold = None, None
        best_info_gain = -1

        current_entropy = entropy(y)
        n_samples, n_features = X.shape

        if self.feature_subsample_size:
            feature_indices = np.random.choice(
                n_features,
                self.feature_subsample_size,
                replace=False
            )
        else:
            feature_indices = range(n_features)

        for feature in feature_indices:
            thresholds = np.unique(X[:, feature])

            for threshold in thresholds:
                _, y_left, _, y_right = split_dataset(X, y, feature, threshold)

                if len(y_left) == 0 or len(y_right) == 0: continue

                left_entropy = entropy(y_left)
                right_entropy = entropy(y_right)

                child_entropy = (
                    len(y_left)/n_samples * left_entropy +
                    len(y_right)/n_samples * right_entropy
                )

                info_gain = current_entropy - child_entropy

                if info_gain > best_info_gain:
                    best_info_gain = info_gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold, best_info_gain
    
    def __build_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        num_classes = len(np.unique(y))

        if ((self.max_depth is not None and depth >= self.max_depth) or n_samples < self.min_samples_split or num_classes == 1):
            leaf_value = self.__most_common_label(y)
            return self.Node(value=leaf_value)
        
        feature, threshold, gain = self.__best_split(X, y)

        if gain <= 0:
            return self.Node(value=self.__most_common_label(y))
        
        X_left, y_left, X_right, y_right = split_dataset(X, y, feature, threshold)

        # recursively find best split and build left and right subtree
        left_child = self.__build_tree(X_left, y_left, depth+1)
        right_child = self.__build_tree(X_right, y_right, depth+1)
        
        return self.Node(feature, threshold, left_child, right_child)
    
    def __predict_sample(self, x, node: Node):
        if node.value is not None:
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self.__predict_sample(x, node.left)
        else:
            return self.__predict_sample(x, node.right)
        
    def plot_tree(self):
        G = nx.DiGraph()

        def add_nodes_edges(node, parent=None, edge_label=""):
            if node is None:
                return
            
            # label node
            if node.value is not None:
                label = f"Leaf\nClass={node.value}"
            else:
                label = f"X[{node.feature}] <= {node.threshold:.3f}"

            G.add_node(id(node), label=label)

            # connect to parent
            if parent is not None:
                G.add_edge(id(parent), id(node), label=edge_label)

            # recursively add children
            if node.left:
                add_nodes_edges(node.left, node, "True")
            if node.right:
                add_nodes_edges(node.right, node, "False")

        add_nodes_edges(self.root)

        pos = nx.nx_agraph.graphviz_layout(G, prog="dot")

        nx.draw(G, pos, with_labels=False, arrows=True)

        plt.show()

    
    def fit(self, X, y):
        self.root = self.__build_tree(X, y)

    def predict(self, X):
        return np.array([self.__predict_sample(x, self.root) for x in X])

In [6]:
class RandomForest:
    def __init__(self, n_trees=10, max_depth=5, min_samples_split=2, feature_subsample_size=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.feature_subsample_size = feature_subsample_size
        self.trees = []

    def __bootstrap_sample(self, X, y):
        n_samples = X.shape[0]
        indices = np.random.choice(n_samples, n_samples, replace=True)
        return X[indices], y[indices]
    
    def fit(self, X, y):
        self.trees = []

        for _ in range(self.n_trees):
            X_boot, y_boot = self.__bootstrap_sample(X,y)
            tree = DecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                feature_subsample_size=self.feature_subsample_size
            )

            tree.fit(X_boot, y_boot)
            self.trees.append(tree)
    
    def predict(self, X):
        tree_predictions = np.array([tree.predict(X) for tree in self.trees])
        predictions = np.round(tree_predictions.mean(axis=0)).astype(int)
        return predictions


In [7]:
data = pd.read_csv("data/diabetes.csv")
data.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1


In [8]:
# split data into X and Y
X = data.drop("Outcome", axis=1).values
Y = data["Outcome"].values.reshape(-1,1)

print(f'X shape: {X.shape}\nY shape: {Y.shape}')

X shape: (768, 8)
Y shape: (768, 1)


In [18]:
# split X and Y into train and test with stratification for unbalanced dataset
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3, random_state=42, stratify=Y)

# create model object and fit train data
RF = RandomForest(n_trees=20, max_depth=None, min_samples_split=5)
RF.fit(X_train, y_train)

In [19]:
# predict values on test data
y_pred = RF.predict(X_test)

# evaluate logistic regression model with precision, recall and f1 score
eval_model(y_test, y_pred);

Precision: 0.3506
Recall: 0.2814
F1: 0.3122
