In [5]:
import numpy as np
import pandas as pd
from collections import Counter

In [9]:
#creating sample data
X = np.random.rand(10, 4) * 10; #10 samples with 4 features
y = np.random.randint(0, 2, size=10) #2 labels of 10 samples

In [95]:
def euclidean_distance(a, b):
    return np.sqrt(np.sum((a-b)**2))

class KNN:
    def __init__(self, k=3):
        self.k = k

    def fit(self, x, y):
        self.x_train = x
        self.y_train = y

    def predict(self, x):
        #compute distance
        distances = [euclidean_distance(x, x_train) for x_train in self.x_train]

        #get k neighbors
        k_indices = np.argsort(distances)[:self.k]

        # majority vote
        k_neigbors_label = [self.y_train[i] for i in k_indices]
        most_common = Counter(k_neigbors_label).most_common(1)[0][0]
        return most_common

    def _predict(self, test):
        return [self.predict(x) for x in test]
        

In [109]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X = iris.data
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, shuffle=True, random_state=42)
knn = KNN()
knn.fit(X_train, y_train)
y_pred = knn._predict(X_test)

accuracy = sum(1 for a, b in zip(y_pred, y_test) if a == b) / len(y_test)
print("Train-Test Split Accuracy:", accuracy)

Train-Test Split Accuracy: 1.0


In [112]:
from sklearn.model_selection import LeaveOneOut

loo = LeaveOneOut()

for train_index, test_index in loo.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

    knn.fit(X_train, y_train)
    y_pred = knn._predict(X_test)

    accuracies.append(int(y_pred[0] == y_test[0]))

print("LOOCV Accuracy:", sum(accuracies) / len(accuracies))

LOOCV Accuracy: 0.9600000000000001


In [114]:
from sklearn.model_selection import KFold

kf = KFold(n_splits=5, shuffle=True, random_state=42)

for train_index, test_index in kf.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

    knn.fit(X_train, y_train)
    y_pred = knn._predict(X_test)

    acc = sum(1 for a, b in zip(y_pred, y_test) if a == b) / len(y_test)
    accuracies.append(acc)

print("5-Fold Cross Validation Accuracy:", sum(accuracies) / len(accuracies))

5-Fold Cross Validation Accuracy: 0.9602083333333334


In [7]:
def entropy(feature):
    counts = Counter(feature) #frequency of each label
    total = len(feature)
    entropy_ = 0.0
    for c in counts.values():
        p = c/total
        if p > 0:
            entropy_ -= p * np.log2(p)
    return entropy_

'''
def info_gain(X_column, y, split_value=None): #X_column are the values of a feature, y is the label of each value, and split_value defines whether the feature is categorical or numerical
    parent_entropy = entropy(y)
    n = len(y)
    total_entropy = 0.0
    if split_value is None: #categorical feature
        values = set(X_column) #it gives us the unique values eg feature has_job=['yes', 'no', 'no', 'yes', 'yes'] ---> with set: values=['yes', 'no'], removes duplicates
        for v in values:
            y_subset = [y[i] for i in range(len(y)) if X_column[i] == v] 
            total_entropy += (len(y_subset)/n) * entropy(y_subset)
    else: #binary split when split_value is given
        if isinstance(split_value, (int, float)): #if true then numerical feature
            left_y = [y[i] for i in range(n) if X_column[i] <= split_value]
            right_y = [y[i] for i in range(n) if X_column[i] > split_value]
        else: #catagorical value
            left_y = [y[i] for i in range(n) if X_column[i] == split_value]
            right_y = [y[i] for i in range(n) if X_column[i] != split_value]

        #if one side is empty
        if len(left_y) == 0 or len(right_y) == 0:
            return 0.0
            
        total_entropy += ((len(left_y)/n) * entropy(left_y)) + ((len(right_y)/n) * entropy(right_y))

    return parent_entropy - total_entropy
'''

def info_gain(X_column, y, split_value):
    parent_entropy = entropy(y)
    n = len(y)

    if isinstance(split_value, (int, float)):
        # numeric split
        left_y = [y[i] for i in range(n) if X_column[i] <= split_value]
        right_y = [y[i] for i in range(n) if X_column[i] > split_value]
    else:
        # categorical split
        left_y = [y[i] for i in range(n) if X_column[i] == split_value]
        right_y = [y[i] for i in range(n) if X_column[i] != split_value]

    # if one branch is empty, no information gain
    if len(left_y) == 0 or len(right_y) == 0:
        return 0.0

    weighted_entropy = (len(left_y)/n) * entropy(left_y) + (len(right_y)/n) * entropy(right_y)
    return parent_entropy - weighted_entropy


#  -----Decision Tree-----
class Node:
    def __init__(self, feature=None, threshold=None, right=None, left=None, *, value=None):
        self.right = right
        self.left = left
        self.value = value
        self.feature = feature
        self.threshold = threshold

    def is_leaf(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.root = None

    def fit(self, x, y):
        self.root = self.grow_tree(x, y)

    '''
    def grow_tree(self, x, y, depth=0): #recursive function
        #if empty dataset
        if len(x) == 0:
            return Node(value=None)
        
        #stopping condition
        if len(set(y)) == 1: #all labels are the same i.e pure node, one unique label
            return Node(value=y[0]) 

        num_features = len(x[0]) if len(x) > 0 else 0
        if depth >= self.max_depth or num_features == 0: #if we reached max_depth or no more features to split
            return Node(value=Counter(y).most_common(1)[0][0]) #returning leaf node with the majority class

        # ---- choosing best feature to split -----
        best_gain = -1.0
        split_index, split_threshold = None, None

         #split_index --> stores the index of the feature ;feature on which split has to be done eg:has job splits into yes and no
         #Example: if your data x = [[Age, Color, Student], …], and the best split is on "Color", then split_index = 1 (the column number).
         #split_threshold --> stores actual value of the feature on which we are splitting
         #Example: If the feature is numeric (e.g., Age=30), then threshold is that number → split into ≤30 vs >30.
                   If the feature is categorical (e.g., Color="Red"), then threshold is that category → split into "Red" vs "Not Red".

        for feature in range(num_features):
            #col = [row[feature] for row in x] --> list of features
            #value = set(col) --> values of each features
            values = set(row[feature] for row in x) #equivalent to value=sorted(set(col)
            for val in values:
                gain = info_gain([row[feature] for row in x], y, val if isinstance(val, (int, float)) else val) #check
                if gain > best_gain:
                    best_gain = gain
                    split_index = feature
                    split_threshold = val
                    
        if best_gain <= 0:
            return Node(value=Counter(y).most_common(1)[0][0])

        #splitting dataset
        if isinstance(split_threshold, (int, float)):
            left_index = [i for i in range(len(x)) if x[i][split_index] <= split_threshold]
            right_index = [i for i in range(len(x)) if x[i][split_index] > split_threshold]

        else: #categorical
            left_index = [i for i in range(len(x)) if x[i][split_index] == split_threshold]
            right_index = [i for i in range(len(x)) if x[i][split_index] != split_threshold]

        left = self.grow_tree([x[i] for i in left_index], [y[i] for i in left_index], depth+1)
        right = self.grow_tree([x[i] for i in right_index], [y[i] for i in right_index], depth+1)

        #this is equivalent to the above left and right recursive calls
        #X_left  = [X[i] for i in left_idx]
        #y_left  = [y[i] for i in left_idx]
        #X_right = [X[i] for i in right_idx]
        #y_right = [y[i] for i in right_idx]

        # recursive calls
        #left_child  = self.grow_tree(X_left, y_left, depth + 1)
        #right_child = self.grow_tree(X_right, y_right, depth + 1)
        
        return Node(split_index, split_threshold, left, right)

        '''


    def grow_tree(self, x, y, depth=0):
        # base cases
        if len(x) == 0:
            return Node(value=None)
        if len(set(y)) == 1:
            return Node(value=y[0])
        if depth >= self.max_depth:
            return Node(value=Counter(y).most_common(1)[0][0])
    
        num_features = len(x[0])
        best_gain = -1.0
        split_index, split_threshold = None, None
    
        # loop through features
        for feature in range(num_features):
            values = sorted(set(row[feature] for row in x))
    
            # decide candidate thresholds
            if all(isinstance(v, (int, float)) for v in values):
                # numeric -> use midpoints
                candidates = [(values[i] + values[i+1]) / 2 for i in range(len(values)-1)]
            else:
                # categorical -> use unique values
                candidates = values
    
            # test each candidate
            for val in candidates:
                gain = info_gain([row[feature] for row in x], y, val)
                if gain > best_gain:
                    best_gain = gain
                    split_index = feature
                    split_threshold = val
    
        # if no useful split, make leaf
        if best_gain <= 0:
            return Node(value=Counter(y).most_common(1)[0][0])
    
        # split dataset
        if isinstance(split_threshold, (int, float)):
            left_index = [i for i in range(len(x)) if x[i][split_index] <= split_threshold]
            right_index = [i for i in range(len(x)) if x[i][split_index] > split_threshold]
        else:
            left_index = [i for i in range(len(x)) if x[i][split_index] == split_threshold]
            right_index = [i for i in range(len(x)) if x[i][split_index] != split_threshold]
    
        left = self.grow_tree([x[i] for i in left_index], [y[i] for i in left_index], depth+1)
        right = self.grow_tree([x[i] for i in right_index], [y[i] for i in right_index], depth+1)
    
        return Node(feature=split_index, threshold=split_threshold, left=left, right=right)


    def predict_one(self, x):
            node = self.root
            while not node.is_leaf():
                if isinstance(node.threshold, (int, float)):
                    if x[node.feature] <= node.threshold:
                        node = node.left
                    else:
                        node = node.right
                else: #categorical
                    if x[node.feature] == node.threshold:
                        node = node.left
                    else:
                        node = node.right
            return node.value

    def predict(self, X):
            return [self.predict_one(x) for x in X]
                

In [61]:
# Small dataset: [Age, Has_Job, Owns_House] -> Buy?
X = [
    [25, "No", "No"],
    [30, "Yes", "No"],
    [40, "Yes", "Yes"],
    [35, "Yes", "Yes"],
    [22, "No", "No"]
]
y = ["No", "Yes", "Yes", "Yes", "No"]

tree = DecisionTree(max_depth=3)
tree.fit(X, y)

print(tree.predict([[28, "Yes", "No"], [23, "No", "No"]]))
# Output: ['Yes', 'No']


['Yes', 'No']


In [71]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split, LeaveOneOut
import pandas as pd

# load iris
iris = load_iris()
X = iris.data        # numeric features
y = iris.target      # labels (0,1,2)

# convert to list of lists (since your tree uses Python lists)
X = X.tolist()
y = y.tolist()

# train your custom tree
tree = DecisionTree(max_depth=3)
tree.fit(X, y)

# predict a few samples
print(tree.predict(X[30:]))
print("Actual:", y[30:])


[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Actual: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]


In [81]:
from sklearn.model_selection import train_test_split
iris = load_iris()
x = iris.data
y = iris.target

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle=True, random_state=42)

tree = DecisionTree(max_depth=3)
tree.fit(x_train.tolist(), y_train.tolist())

#predict
y_pred = tree.predict(x_test.tolist())

accuracy = sum(1 for a, b in zip(y_pred, y_test) if a == b) / len(y_test)
print("Train-Test Split Accuracy:", accuracy)

print(y_pred)
print("Actual:", y_test[:])

Train-Test Split Accuracy: 0.9666666666666667
[1, 0, 2, 1, 2, 0, 1, 2, 1, 1, 2, 0, 0, 0, 0, 1, 2, 1, 1, 2, 0, 2, 0, 2, 2, 2, 2, 2, 0, 0]
Actual: [1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0]


In [87]:
from sklearn.model_selection import LeaveOneOut

X = iris.data
Y = iris.target

loo = LeaveOneOut()
accuracies = []

for train_index, test_index in loo.split(X):
    X_train, X_test = X[train_index], X[test_index]
    Y_train, Y_test = Y[train_index], Y[test_index]

    tree = DecisionTree(max_depth=3)
    tree.fit(X_train.tolist(), Y_train.tolist())
    y_pred = tree.predict(X_test.tolist())
    accuracies.append(int(y_pred[0] == Y_test[0]))

print("LOOCV Accuracy:", sum(accuracies) / len(accuracies))

LOOCV Accuracy: 0.9533333333333334


In [89]:
from sklearn.model_selection import KFold

kf = KFold(n_splits=5, shuffle=True, random_state=42)
accuracies = []

for train_index, test_index in kf.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = Y[train_index], Y[test_index]

    tree = DecisionTree(max_depth=3)
    tree.fit(X_train.tolist(), y_train.tolist())

    y_pred = tree.predict(X_test.tolist())
    
    acc = sum(1 for a, b in zip(y_pred, y_test) if a == b) / len(y_test)
    accuracies.append(acc)

print("5-Fold Cross Validation Accuracy:", sum(accuracies) / len(accuracies))

5-Fold Cross Validation Accuracy: 0.96


## Another implementation of DT

In [30]:
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        
    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None
        self.feature_types = {}  # Store feature types for prediction

    def fit(self, X, y):
        # Convert to numpy arrays and handle mixed data types
        X = self._convert_to_array(X)
        y = np.array(y)
        
        # Determine feature types
        self._determine_feature_types(X)
        
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.root = self._grow_tree(X, y)

    def _convert_to_array(self, X):
        """Convert input to numpy array while preserving data types"""
        if isinstance(X, pd.DataFrame):
            # Convert each column separately to handle mixed types
            arrays = []
            for col in X.columns:
                col_data = X[col].values
                # Try to convert to numeric, if fails keep as object
                try:
                    arrays.append(pd.to_numeric(col_data).values.reshape(-1, 1))
                except:
                    arrays.append(col_data.reshape(-1, 1))
            return np.hstack(arrays)
        else:
            return np.array(X)

    def _determine_feature_types(self, X):
        """Determine if each feature is numeric or categorical"""
        for i in range(X.shape[1]):
            try:
                # Try to convert to float to check if numeric
                _ = X[:, i].astype(float)
                self.feature_types[i] = 'numeric'
            except (ValueError, TypeError):
                self.feature_types[i] = 'categorical'

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # check the stopping criteria
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)

        # find the best split
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)
        
        # If no good split found
        if best_feature is None:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh, best_feature)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # calculate the information gain
                gain = self._information_gain(y, X_column, thr, feat_idx)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold, feat_idx):
        # parent entropy
        parent_entropy = self._entropy(y)

        # create children
        left_idxs, right_idxs = self._split(X_column, threshold, feat_idx)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        
        # calculate the weighted avg. entropy of children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l / n) * e_l + (n_r / n) * e_r

        # calculate the IG
        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh, feat_idx):
        """Split data based on feature type"""
        if self.feature_types.get(feat_idx, 'numeric') == 'numeric':
            # Numeric split
            try:
                X_column_float = X_column.astype(float)
                split_thresh_float = float(split_thresh)
                left_idxs = np.where(X_column_float <= split_thresh_float)[0]
                right_idxs = np.where(X_column_float > split_thresh_float)[0]
            except:
                # Fallback to categorical if conversion fails
                left_idxs = np.where(X_column == split_thresh)[0]
                right_idxs = np.where(X_column != split_thresh)[0]
        else:
            # Categorical split
            left_idxs = np.where(X_column == split_thresh)[0]
            right_idxs = np.where(X_column != split_thresh)[0]
        return left_idxs, right_idxs

    def _entropy(self, y):
        """Calculate entropy that works with any data type"""
        if len(y) == 0:
            return 0
        
        # Use Counter instead of bincount for mixed data types
        counts = Counter(y)
        total = len(y)
        entropy_val = 0.0
        
        for count in counts.values():
            p = count / total
            if p > 0:
                entropy_val -= p * np.log(p)
                
        return entropy_val

    def _most_common_label(self, y):
        if len(y) == 0:
            return None
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        # Convert to numpy array for consistent indexing
        X = self._convert_to_array(X)
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        feature_value = x[node.feature]
        feature_type = self.feature_types.get(node.feature, 'numeric')
        
        if feature_type == 'numeric':
            # Numeric comparison
            try:
                feature_value_float = float(feature_value)
                threshold_float = float(node.threshold)
                if feature_value_float <= threshold_float:
                    return self._traverse_tree(x, node.left)
                else:
                    return self._traverse_tree(x, node.right)
            except:
                # Fallback to categorical comparison
                if str(feature_value) == str(node.threshold):
                    return self._traverse_tree(x, node.left)
                else:
                    return self._traverse_tree(x, node.right)
        else:
            # Categorical comparison
            if str(feature_value) == str(node.threshold):
                return self._traverse_tree(x, node.left)
            else:
                return self._traverse_tree(x, node.right)

In [32]:
# Test the code
dt_data = pd.read_csv('dt_scratch_data.csv')
x = dt_data.iloc[:, :-1]
y = dt_data.iloc[:, -1]

# If your target variable is categorical, encode it
from sklearn.preprocessing import LabelEncoder
if y.dtype == 'object':
    le = LabelEncoder()
    y = le.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle=True, random_state=42)

dt = DecisionTree(max_depth=3)
dt.fit(X_train, y_train)
y_pred = dt.predict(X_test)

def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)

acc = accuracy(y_test, y_pred)
print(f"Accuracy: {acc}")

Accuracy: 1.0


In [36]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
import numpy as np

data = datasets.load_breast_cancer()
X, y = data.data, data.target

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=1234
)

clf = DecisionTree(max_depth=10)
clf.fit(X_train, y_train)
predictions = clf.predict(X_test)

def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)

acc = accuracy(y_test, predictions)
print(acc)

0.9473684210526315


## CART

In [39]:
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        
    def is_leaf_node(self):
        return self.value is not None


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None
        self.feature_types = {}  # Store feature types for prediction

    def fit(self, X, y):
        # Convert to numpy arrays and handle mixed data types
        X = self._convert_to_array(X)
        y = np.array(y)
        
        # Determine feature types
        self._determine_feature_types(X)
        
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.root = self._grow_tree(X, y)

    def _convert_to_array(self, X):
        """Convert input to numpy array while preserving data types"""
        if isinstance(X, pd.DataFrame):
            # Convert each column separately to handle mixed types
            arrays = []
            for col in X.columns:
                col_data = X[col].values
                # Try to convert to numeric, if fails keep as object
                try:
                    arrays.append(pd.to_numeric(col_data).values.reshape(-1, 1))
                except:
                    arrays.append(col_data.reshape(-1, 1))
            return np.hstack(arrays)
        else:
            return np.array(X)

    def _determine_feature_types(self, X):
        """Determine if each feature is numeric or categorical"""
        for i in range(X.shape[1]):
            try:
                # Try to convert to float to check if numeric
                _ = X[:, i].astype(float)
                self.feature_types[i] = 'numeric'
            except (ValueError, TypeError):
                self.feature_types[i] = 'categorical'

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # check the stopping criteria
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)

        # find the best split
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)
        
        # If no good split found
        if best_feature is None:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh, best_feature)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gini = float('inf')  # Changed: minimize Gini instead of maximize Gain
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # calculate the gini impurity
                gini = self._gini_impurity(y, X_column, thr, feat_idx)

                if gini < best_gini:  # Changed: look for minimum Gini
                    best_gini = gini
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold

    def _gini_impurity(self, y, X_column, threshold, feat_idx):
        """Calculate Gini impurity for a split (CART algorithm)"""
        # create children
        left_idxs, right_idxs = self._split(X_column, threshold, feat_idx)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return float('inf')  # Return worst case if split is invalid
        
        # calculate the weighted gini impurity of children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        
        # Calculate Gini for left and right children
        gini_l = self._gini(y[left_idxs])
        gini_r = self._gini(y[right_idxs])
        
        # Weighted average of children Gini impurities
        weighted_gini = (n_l / n) * gini_l + (n_r / n) * gini_r
        
        return weighted_gini

    def _gini(self, y):
        """Calculate Gini impurity for a node"""
        if len(y) == 0:
            return 0
        
        # Use Counter to handle any data type
        counts = Counter(y)
        total = len(y)
        gini = 1.0
        
        for count in counts.values():
            p = count / total
            gini -= p ** 2
            
        return gini

    def _split(self, X_column, split_thresh, feat_idx):
        """Split data based on feature type"""
        if self.feature_types.get(feat_idx, 'numeric') == 'numeric':
            # Numeric split
            try:
                X_column_float = X_column.astype(float)
                split_thresh_float = float(split_thresh)
                left_idxs = np.where(X_column_float <= split_thresh_float)[0]
                right_idxs = np.where(X_column_float > split_thresh_float)[0]
            except:
                # Fallback to categorical if conversion fails
                left_idxs = np.where(X_column == split_thresh)[0]
                right_idxs = np.where(X_column != split_thresh)[0]
        else:
            # Categorical split
            left_idxs = np.where(X_column == split_thresh)[0]
            right_idxs = np.where(X_column != split_thresh)[0]
        return left_idxs, right_idxs

    def _most_common_label(self, y):
        if len(y) == 0:
            return None
        counter = Counter(y)
        value = counter.most_common(1)[0][0]
        return value

    def predict(self, X):
        # Convert to numpy array for consistent indexing
        X = self._convert_to_array(X)
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        feature_value = x[node.feature]
        feature_type = self.feature_types.get(node.feature, 'numeric')
        
        if feature_type == 'numeric':
            # Numeric comparison
            try:
                feature_value_float = float(feature_value)
                threshold_float = float(node.threshold)
                if feature_value_float <= threshold_float:
                    return self._traverse_tree(x, node.left)
                else:
                    return self._traverse_tree(x, node.right)
            except:
                # Fallback to categorical comparison
                if str(feature_value) == str(node.threshold):
                    return self._traverse_tree(x, node.left)
                else:
                    return self._traverse_tree(x, node.right)
        else:
            # Categorical comparison
            if str(feature_value) == str(node.threshold):
                return self._traverse_tree(x, node.left)
            else:
                return self._traverse_tree(x, node.right)


# Test the code
dt_data = pd.read_csv('dt_scratch_data.csv')
x = dt_data.iloc[:, :-1]
y = dt_data.iloc[:, -1]

# If your target variable is categorical, encode it
from sklearn.preprocessing import LabelEncoder
if y.dtype == 'object':
    le = LabelEncoder()
    y = le.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2, shuffle=True, random_state=42)

dt = DecisionTree(max_depth=3)
dt.fit(X_train, y_train)
y_pred = dt.predict(X_test)

def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)

acc = accuracy(y_test, y_pred)
print(f"Accuracy: {acc}")

Accuracy: 0.6666666666666666


In [41]:
import numpy as np
from collections import Counter

# Node class for the tree
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None


class DecisionTreeCART:
    def __init__(self, min_samples_split=2, n_features=None):
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.root = None

    def fit(self, X, y):
        # convert to numpy
        X = np.array(X)
        y = np.array(y)
        self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # stopping conditions
        if (n_labels == 1) or (n_samples < self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # choose random features
        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False)

        # find the best split
        best_feat, best_thresh = self._best_split(X, y, feat_idxs)

        if best_feat is None:  # if no split improves
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # split data
        left_idxs, right_idxs = self._split(X[:, best_feat], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs])
        right = self._grow_tree(X[right_idxs, :], y[right_idxs])
        return Node(best_feat, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # calculate gini gain
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold):
        # parent gini
        parent_gini = self._gini(y)

        # create children
        left_idxs, right_idxs = self._split(X_column, threshold)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        # weighted avg. gini
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        gini_l, gini_r = self._gini(y[left_idxs]), self._gini(y[right_idxs])
        child_gini = (n_l / n) * gini_l + (n_r / n) * gini_r

        # information gain
        ig = parent_gini - child_gini
        return ig

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _gini(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return 1 - np.sum(ps ** 2)

    def _most_common_label(self, y):
        counter = Counter(y)
        return counter.most_common(1)[0][0]

    def predict(self, X):
        X = np.array(X)
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)


# ---------------------------
# Example usage (Iris dataset)
# ---------------------------
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X, y = iris.data, iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

tree = DecisionTreeCART(min_samples_split=2)
tree.fit(X_train, y_train)
y_pred = tree.predict(X_test)

acc = np.sum(y_pred == y_test) / len(y_test)
print("Accuracy:", acc)


Accuracy: 1.0
