In [1]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin

# Utility functions for different split criteria
def gini_impurity(y):
    hist = np.bincount(y)
    ps = hist / len(y)
    return 1 - np.sum(ps ** 2)

def entropy(y):
    hist = np.bincount(y)
    ps = hist / len(y)
    return -np.sum([p * np.log2(p) for p in ps if p > 0])

def misclassification_error(y):
    hist = np.bincount(y)
    return 1 - np.max(hist) / len(y)

class TreeNode:
    def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None

class DecisionTreeClassifierFromScratch:
    def __init__(self, criterion="gini", max_depth=100, min_samples_split=2, min_impurity_decrease=0.0):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_impurity_decrease = min_impurity_decrease
        self.root = None
    
    def _criterion_func(self, y):
        if self.criterion == "gini":
            return gini_impurity(y)
        elif self.criterion == "entropy":
            return entropy(y)
        elif self.criterion == "misclassification":
            return misclassification_error(y)
        else:
            raise ValueError("Invalid criterion")

    def fit(self, X, y):
        self.root = self._grow_tree(X, y)
    
    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))
    
        # Early stopping criteria
        if (self.max_depth is not None and depth >= self.max_depth) or n_labels == 1 or n_samples < self.min_samples_split:
            leaf_value = self._most_common_label(y)
            return TreeNode(value=leaf_value)
        
        feat_idx, threshold = self._best_split(X, y, n_features)
        if feat_idx is None:
            leaf_value = self._most_common_label(y)
            return TreeNode(value=leaf_value)
        
        # Calculate the potential gain from this split
        parent_impurity = self._criterion_func(y)
        left_idxs, right_idxs = self._split(X[:, feat_idx], threshold)
        n_left, n_right = len(left_idxs), len(right_idxs)
        if n_left == 0 or n_right == 0:
            leaf_value = self._most_common_label(y)
            return TreeNode(value=leaf_value)

        left_impurity = self._criterion_func(y[left_idxs])
        right_impurity = self._criterion_func(y[right_idxs])
        child_impurity = (n_left / n_samples) * left_impurity + (n_right / n_samples) * right_impurity

        # Stop growing if the impurity decrease is below the threshold
        if parent_impurity - child_impurity < self.min_impurity_decrease:
            leaf_value = self._most_common_label(y)
            return TreeNode(value=leaf_value)
        
        # Continue growing the tree
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return TreeNode(feat_idx, threshold, left, right)

    def _best_split(self, X, y, n_features):
        best_gain = -1
        split_idx, split_thresh = None, None
        
        for feat_idx in range(n_features):
            X_column = X[:, feat_idx]
            
            # Check if the column is numeric or categorical
            if np.issubdtype(X_column.dtype, np.number):
                thresholds = np.unique(X_column[~np.isnan(X_column)])  # Handle numerical columns
            else:
                thresholds = np.unique(X_column.astype(str))  # Handle categorical columns
            
            for threshold in thresholds:
                gain = self._information_gain(y, X_column, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_thresh = threshold
        return split_idx, split_thresh

    def _information_gain(self, y, X_column, split_thresh):
        parent_loss = self._criterion_func(y)
        
        left_idxs, right_idxs = self._split(X_column, split_thresh)
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0
        
        n = len(y)
        n_left, n_right = len(left_idxs), len(right_idxs)
        e_left, e_right = self._criterion_func(y[left_idxs]), self._criterion_func(y[right_idxs])
        child_loss = (n_left / n) * e_left + (n_right / n) * e_right
        
        return parent_loss - child_loss
    
    def _split(self, X_column, split_thresh):
        if isinstance(split_thresh, str):
            left_idxs = np.argwhere(X_column == split_thresh).flatten()
            right_idxs = np.argwhere(X_column != split_thresh).flatten()
        else:
            left_idxs = np.argwhere(X_column <= split_thresh).flatten()
            right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs
    
    def _most_common_label(self, y):
        return np.bincount(y).argmax()
    
    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])
    
    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value
        if isinstance(node.threshold, str):
            if x[node.feature] == node.threshold:
                return self._traverse_tree(x, node.left)
            return self._traverse_tree(x, node.right)
        else:
            if x[node.feature] <= node.threshold:
                return self._traverse_tree(x, node.left)
            return self._traverse_tree(x, node.right)

# Wrapper class for compatibility with Scikit-learn functions
class MyDecisionTreeWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, criterion="gini", max_depth=10, min_samples_split=4, min_impurity_decrease=0.0):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_impurity_decrease = min_impurity_decrease
        self.tree = DecisionTreeClassifierFromScratch(criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_impurity_decrease=self.min_impurity_decrease)
        self.classes_ = None
    
    def fit(self, X, y):
        self.tree.fit(X, y)
        self.classes_ = np.unique(y)  # Set the classes_ attribute
        return self
    
    def predict(self, X):
        return self.tree.predict(X)


In [3]:
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split, RandomizedSearchCV

# Load and preprocess the data
path = 'C:/Users/andre/Downloads/mushroom/secondary_data.csv'
df = pd.read_csv(path, delimiter=';')
df['class'] = df['class'].map({'p': 1, 'e': 0})

# Convert ranges to mean for numerical columns
numerical_columns = ['cap-diameter', 'stem-height', 'stem-width']

for column in numerical_columns:
    df[column] = df[column].apply(lambda x: float(x) if pd.notnull(x) else x)

# Impute missing values for numerical columns
imputer_numerical = SimpleImputer(strategy='mean')
df[numerical_columns] = imputer_numerical.fit_transform(df[numerical_columns])

# Define categorical columns
categorical_columns = ['cap-shape', 'cap-surface', 'cap-color', 'does-bruise-or-bleed', 
                       'gill-attachment', 'gill-spacing', 'gill-color', 'stem-root', 
                       'stem-surface', 'stem-color', 'veil-type', 'veil-color', 
                       'has-ring', 'ring-type', 'spore-print-color', 'habitat', 'season']

# Impute missing values for categorical columns with the most frequent value
imputer_categorical = SimpleImputer(strategy='most_frequent')
df[categorical_columns] = imputer_categorical.fit_transform(df[categorical_columns])

# Split the data into features and target variable
X = df.drop(columns=['class']).values
y = df['class'].values

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# (Optional) Use a subset of the data for faster tuning
X_sample, _, y_sample, _ = train_test_split(X_train, y_train, train_size=0.5, random_state=42)

# Define a reduced hyperparameter space
param_dist = {
    'criterion': ['gini', 'entropy'],
    'max_depth': [5, 10],
    'min_samples_split': [2, 4],
    'min_impurity_decrease': [0.0, 0.01]
}

# Use RandomizedSearchCV for faster hyperparameter tuning
randomized_search = RandomizedSearchCV(MyDecisionTreeWrapper(), param_dist, n_iter=10, cv=3, scoring='accuracy', random_state=42, n_jobs=-1)
randomized_search.fit(X_sample, y_sample)  # Use the sample for quicker tuning

best_params = randomized_search.best_params_
best_score = randomized_search.best_score_

print("Best parameters found: ", best_params)
print("Best cross-validation accuracy: ", best_score)

# Train the final model using the best-found parameters on the full dataset
final_model = MyDecisionTreeWrapper(
    criterion=best_params['criterion'],
    max_depth=best_params['max_depth'],
    min_samples_split=best_params['min_samples_split'],
    min_impurity_decrease=best_params['min_impurity_decrease']
)

final_model.fit(X_train, y_train)
final_predictions = final_model.predict(X_test)

# Evaluate the final model accuracy on the test data
final_accuracy = np.mean(final_predictions == y_test)
print(f"Final model accuracy on test data after tuning: {final_accuracy_pruned}")


Best parameters found:  {'min_samples_split': 2, 'min_impurity_decrease': 0.0, 'max_depth': 5, 'criterion': 'gini'}
Best cross-validation accuracy:  0.9256560092900123
Final model accuracy on test data after pruning and tuning: 0.7802521696413951
