In [1]:
import numpy as np
import pandas as pd
import random

In [2]:
def custom_label_encode(series):
    unique_values = series.unique()
    mapping = {val: idx for idx, val in enumerate(unique_values)}
    return series.map(mapping), mapping


def data_preprocessing(primary_path, secondary_path=None, sample_ratio=0.1):
    primary_data = pd.read_csv(primary_path).fillna("N/A")
    X_primary = primary_data.drop(columns=["class"])
    y_primary, _ = custom_label_encode(primary_data["class"])

    if secondary_path:
        secondary_data = pd.read_csv(secondary_path).fillna("N/A")
        
        # Sample only a fraction of secondary data to reduce size
        sample_size = int(len(secondary_data) * sample_ratio)
        secondary_data = secondary_data.sample(n=sample_size, random_state=42)

        X_secondary = secondary_data.drop(columns=["class"])
        y_secondary, _ = custom_label_encode(secondary_data["class"])

        # Ensure both datasets have the same columns
        for col in X_primary.columns:
            if col not in X_secondary.columns:
                X_secondary[col] = "N/A"

        for col in X_secondary.columns:
            if col not in X_primary.columns:
                X_primary[col] = "N/A"

        # Align column order
        X_secondary = X_secondary[X_primary.columns]

        # Convert categorical columns to numeric
        for column in X_primary.columns:
            X_primary[column], _ = custom_label_encode(X_primary[column])
        for column in X_secondary.columns:
            X_secondary[column], _ = custom_label_encode(X_secondary[column])

        # Convert to NumPy arrays
        X_primary_np = X_primary.to_numpy()
        X_secondary_np = X_secondary.to_numpy()
        y_primary_np = y_primary.to_numpy()
        y_secondary_np = y_secondary.to_numpy()

        # Merge sampled dataset
        X_merged = np.vstack((X_primary_np, X_secondary_np))
        y_merged = np.concatenate((y_primary_np, y_secondary_np))

        return X_primary_np, y_primary_np, X_merged, y_merged

    return X_primary.to_numpy(), y_primary.to_numpy(), None, None

def train_test_split(X, y, test_size=0.2, random_state=None):
    if random_state is not None:
        np.random.seed(random_state)
    
    n_samples = len(X)
    n_test = int(n_samples * test_size)
    indices = np.random.permutation(n_samples)
    
    test_indices = indices[:n_test]
    train_indices = indices[n_test:]
    
    return X[train_indices], X[test_indices], y[train_indices], y[test_indices]

def run_mushroom_classification(X_merged, y_merged):
    """Run complete mushroom classification experiment"""
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X_merged, y_merged, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    
    # Determine feature types
    feature_types = ['categorical' if isinstance(val, (str, np.str_)) else 'numerical' 
                    for val in X_merged[0]]
    
    # Perform grid search
    grid_search_results = perform_grid_search(X_train, y_train, X_val, y_val, feature_types)
    
    print("\nBest parameters:", grid_search_results['best_params'])
    
    # Train final model with best parameters
    best_model = DecisionTree(**grid_search_results['best_params'])
    best_model.fit(X_train, y_train, feature_types)
    
    # Evaluate on test set
    y_pred = best_model.predict(X_test)
    test_metrics = evaluate_model(y_test, y_pred)
    
    print("\nTest set metrics:")
    for metric, value in test_metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # Feature importance analysis
    feature_importance_dict = {
        f"Feature {i}": importance 
        for i, importance in enumerate(best_model.feature_importances_)
    }
    print("\nTop 5 most important features:")
    sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
    for feature, importance in sorted_features:
        print(f"{feature}: {importance:.4f}")

    return best_model, grid_search_results, test_metrics

def evaluate_model(y_true, y_pred):
    """Evaluate model performance including 0-1 loss"""
    accuracy = np.mean(y_true == y_pred)
    zero_one_loss = 1 - accuracy
    
    tp = np.sum((y_true == 1) & (y_pred == 1))
    fp = np.sum((y_true == 0) & (y_pred == 1))
    fn = np.sum((y_true == 1) & (y_pred == 0))
    tn = np.sum((y_true == 0) & (y_pred == 0))
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        'accuracy': accuracy,
        'zero_one_loss': zero_one_loss,
        'precision': precision,
        'recall': recall,
        'f1_score': f1
    }


## overfitting analysis

In [3]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None, is_categorical=False):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value
        self.is_leaf = value is not None
        self.is_categorical = is_categorical
        
    def decision_function(self, x):
        """Decision criterion/test function"""
        if self.is_categorical:
            return x[self.feature] == self.threshold
        return x[self.feature] <= self.threshold
        
class DecisionTree:
    def __init__(self, criterion='gini', max_depth=5, min_samples_leaf=1, 
                 min_impurity_decrease=0.0, min_weight_fraction_leaf=0.0, ccp_alpha=0.0):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.min_impurity_decrease = min_impurity_decrease
        self.min_weight_fraction_leaf = min_weight_fraction_leaf
        self.ccp_alpha = ccp_alpha  # Added for pruning
        self.root = None
        self.feature_types = None
        self.n_classes = None
        self.feature_importances_ = None

    def _calculate_split_score(self, y, left_y, right_y):
        """
        Calculate the split score based on the selected criterion.
        """
        if self.criterion == 'gini':
            return self._gini_score(y, left_y, right_y)
        elif self.criterion == 'entropy':
            return self._entropy_score(y, left_y, right_y)
        else:  # 'misclassification'
            return self._misclassification_score(y, left_y, right_y)

    def _gini_score(self, y, left_y, right_y):
        """
        Calculate the weighted Gini impurity for a split.
        """
        def gini(y_subset):
            _, counts = np.unique(y_subset, return_counts=True)
            probabilities = counts / len(y_subset)
            return 1 - np.sum(probabilities ** 2)

        left_weight = len(left_y) / len(y)
        right_weight = len(right_y) / len(y)
        return left_weight * gini(left_y) + right_weight * gini(right_y)

    def _entropy_score(self, y, left_y, right_y):
        """
        Calculate the weighted entropy for a split.
        """
        def entropy(y_subset):
            _, counts = np.unique(y_subset, return_counts=True)
            probabilities = counts / len(y_subset)
            return -np.sum(probabilities * np.log2(probabilities + 1e-10))

        left_weight = len(left_y) / len(y)
        right_weight = len(right_y) / len(y)
        return left_weight * entropy(left_y) + right_weight * entropy(right_y)

    def _misclassification_score(self, y, left_y, right_y):
        """
        Calculate the weighted misclassification error for a split.
        """
        def misclassification(y_subset):
            if len(y_subset) == 0:
                return 0
            _, counts = np.unique(y_subset, return_counts=True)
            return 1 - np.max(counts) / len(y_subset)

        left_weight = len(left_y) / len(y)
        right_weight = len(right_y) / len(y)
        return left_weight * misclassification(left_y) + right_weight * misclassification(right_y)

    def fit(self, X, y, feature_types, prune=False):
        """Train the decision tree with optional pruning"""
        self.n_classes = len(np.unique(y))
        self.feature_types = feature_types
        self.feature_importances_ = np.zeros(X.shape[1])
        self.root = self._build_tree(X, y, depth=0)
        
        if prune:
            self._prune_tree(self.root, X, y)
            
        if np.sum(self.feature_importances_) > 0:
            self.feature_importances_ /= np.sum(self.feature_importances_)

    def _build_tree(self, X, y, depth):
        n_samples = len(y)
        
        # Check stopping criteria
        if (depth >= self.max_depth or 
            len(np.unique(y)) == 1 or 
            n_samples < self.min_samples_leaf or
            n_samples / len(y) < self.min_weight_fraction_leaf):
            return Node(value=np.argmax(np.bincount(y)))

        best_feature, best_threshold, best_score, left_indices, right_indices = self._find_best_split(X, y)
        
        if best_score == float('inf') or best_score < self.min_impurity_decrease:
            return Node(value=np.argmax(np.bincount(y)))

        # Record feature importance
        self.feature_importances_[best_feature] += (best_score * n_samples)
        
        # Build subtrees
        left_child = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_child = self._build_tree(X[right_indices], y[right_indices], depth + 1)

        return Node(
            feature=best_feature,
            threshold=best_threshold,
            left=left_child,
            right=right_child,
            is_categorical=self.feature_types[best_feature] == 'categorical'
        )

    def _find_best_split(self, X, y):
        best_feature = None
        best_threshold = None
        best_score = float('inf')
        best_left_indices = None
        best_right_indices = None

        for feature in range(X.shape[1]):
            unique_values = np.unique(X[:, feature])
            
            if self.feature_types[feature] == 'categorical':
                for value in unique_values:
                    left_mask = X[:, feature] == value
                    right_mask = ~left_mask
                    
                    if np.sum(left_mask) < self.min_samples_leaf or np.sum(right_mask) < self.min_samples_leaf:
                        continue
                    
                    score = self._calculate_split_score(y, y[left_mask], y[right_mask])
                    
                    if score < best_score:
                        best_score = score
                        best_feature = feature
                        best_threshold = value
                        best_left_indices = np.where(left_mask)[0]
                        best_right_indices = np.where(right_mask)[0]
            else:
                for threshold in unique_values:
                    left_mask = X[:, feature] <= threshold
                    right_mask = ~left_mask
                    
                    if np.sum(left_mask) < self.min_samples_leaf or np.sum(right_mask) < self.min_samples_leaf:
                        continue
                    
                    score = self._calculate_split_score(y, y[left_mask], y[right_mask])
                    
                    if score < best_score:
                        best_score = score
                        best_feature = feature
                        best_threshold = threshold
                        best_left_indices = np.where(left_mask)[0]
                        best_right_indices = np.where(right_mask)[0]

        return best_feature, best_threshold, best_score, best_left_indices, best_right_indices

    def predict(self, X):
        """
        Predict class labels for samples in X.
        """
        return np.array([self._predict_single(x, self.root) for x in X])

    def _predict_single(self, x, node):
        """
        Predict the class label for a single data point.
        """
        if node.is_leaf:
            return node.value

        if node.decision_function(x):
            return self._predict_single(x, node.left)
        return self._predict_single(x, node.right)

    def __repr__(self):
        """
        String representation of the decision tree.
        """
        return self._print_tree(self.root)

    def _print_tree(self, node, depth=0):
        """
        Recursively print the tree structure.
        """
        if node.is_leaf:
            return f"{'  ' * depth}Leaf: Class {node.value}\n"
        else:
            return (f"{'  ' * depth}Node: Feature {node.feature}, Threshold {node.threshold}\n"
                    f"{self._print_tree(node.left, depth + 1)}"
                    f"{self._print_tree(node.right, depth + 1)}")


    def _prune_tree(self, node, X, y):
        """
        Perform cost-complexity pruning on the tree.
        """
        if node.is_leaf:
            return
            
        # Prune left and right subtrees
        if node.left:
            self._prune_tree(node.left, X, y)
        if node.right:
            self._prune_tree(node.right, X, y)
            
        # Calculate the error before pruning
        y_pred = self.predict(X)
        error_before = np.mean(y_pred != y)
        
        # Temporarily prune the node
        original_left = node.left
        original_right = node.right
        original_is_leaf = node.is_leaf
        original_value = node.value
        
        node.left = None
        node.right = None
        node.is_leaf = True
        node.value = np.argmax(np.bincount(y))
        
        # Calculate the error after pruning
        y_pred = self.predict(X)
        error_after = np.mean(y_pred != y)
        
        # Calculate the cost-complexity gain
        gain = error_after - error_before + self.ccp_alpha
        
        # If the gain is positive, revert the pruning
        if gain > 0:
            node.left = original_left
            node.right = original_right
            node.is_leaf = original_is_leaf
            node.value = original_value

def calculate_zero_one_loss(y_true, y_pred):
    """Calculate 0-1 loss (misclassification error)"""
    return np.mean(y_true != y_pred)
            

def perform_grid_search(X_train, y_train, X_val, y_val, feature_types):
    """Perform efficient grid search with selected promising parameter combinations"""
    # Reduced parameter combinations focusing on most impactful parameters
    selected_params = [
        # Basic configurations
        {'criterion': 'gini', 'max_depth': 5, 'min_samples_leaf': 1, 
         'min_impurity_decrease': 0.0, 'ccp_alpha': 0.0, 'prune': False},
        
        # Pruned version
        {'criterion': 'gini', 'max_depth': 5, 'min_samples_leaf': 1, 
         'min_impurity_decrease': 0.0, 'ccp_alpha': 0.01, 'prune': True},
        
        # More conservative depth
        {'criterion': 'entropy', 'max_depth': 3, 'min_samples_leaf': 5, 
         'min_impurity_decrease': 0.01, 'ccp_alpha': 0.0, 'prune': False},
        
        # More flexible depth
        {'criterion': 'entropy', 'max_depth': 7, 'min_samples_leaf': 1, 
         'min_impurity_decrease': 0.0, 'ccp_alpha': 0.0, 'prune': False},
        
        # Misclassification criterion
        {'criterion': 'misclassification', 'max_depth': 5, 'min_samples_leaf': 5, 
         'min_impurity_decrease': 0.0, 'ccp_alpha': 0.0, 'prune': False},
        
        # Heavy pruning
        {'criterion': 'gini', 'max_depth': 10, 'min_samples_leaf': 1, 
         'min_impurity_decrease': 0.0, 'ccp_alpha': 0.1, 'prune': True},
    ]
    
    best_loss = float('inf')
    best_params = None
    results = []
    
    
    
    
    for i, params in enumerate(selected_params, 1):
        
        # Separate pruning flag
        prune = params.pop('prune')
        
        # Train and evaluate model
        model = DecisionTree(**params)
        model.fit(X_train, y_train, feature_types, prune=prune)
        
        # Calculate losses
        y_train_pred = model.predict(X_train)
        y_val_pred = model.predict(X_val)
        
        train_loss = calculate_zero_one_loss(y_train, y_train_pred)
        val_loss = calculate_zero_one_loss(y_val, y_val_pred)
        
        # Add pruning flag back
        params['prune'] = prune
        
        results.append({
            'params': params,
            'train_loss': train_loss,
            'val_loss': val_loss
        })
        
        if val_loss < best_loss:
            best_loss = val_loss
            best_params = params
    
    # Sort results by validation loss
    sorted_results = sorted(results, key=lambda x: x['val_loss'])
    
    print("\nResults for All Configurations (Sorted by 0-1 Loss):")
    print("=" * 80)
    for i, result in enumerate(sorted_results, 1):
        print(f"\n{i}. Validation 0-1 Loss: {result['val_loss']:.4f}")
        print(f"   Training 0-1 Loss: {result['train_loss']:.4f}")
        print("Parameters:")
        for param, value in result['params'].items():
            print(f"  - {param}: {value}")
        print("-" * 40)
    
    return {
        'best_params': best_params,
        'best_loss': best_loss,
        'all_results': sorted_results
    }
    


def run_mushroom_classification(X_merged, y_merged):
    """Run complete mushroom classification experiment with pruning comparison"""
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X_merged, y_merged, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    
    # Determine feature types
    feature_types = ['categorical' if isinstance(val, (str, np.str_)) else 'numerical' 
                    for val in X_merged[0]]
    
    # Perform grid search
    grid_search_results = perform_grid_search(X_train, y_train, X_val, y_val, feature_types)
    
    print("\nBest parameters:", grid_search_results['best_params'])
    
    # Train final model with best parameters (without pruning)
    print("\nTraining final models (with and without pruning)...")
    
    # Without pruning
    params_no_prune = grid_search_results['best_params'].copy()
    prune_flag = params_no_prune.pop('prune')
    
    model_no_prune = DecisionTree(**params_no_prune)
    model_no_prune.fit(X_train, y_train, feature_types, prune=False)
    
    # With pruning
    model_with_prune = DecisionTree(**params_no_prune)
    model_with_prune.fit(X_train, y_train, feature_types, prune=True)
    
    # Evaluate both models
    print("\nEvaluation Results:")
    print("=" * 80)
    
    # Without pruning
    y_pred_no_prune = model_no_prune.predict(X_test)
    test_metrics_no_prune = evaluate_model(y_test, y_pred_no_prune)
    
    print("\nTest set metrics (Without Pruning):")
    for metric, value in test_metrics_no_prune.items():
        print(f"{metric}: {value:.4f}")
    
    # With pruning
    y_pred_with_prune = model_with_prune.predict(X_test)
    test_metrics_with_prune = evaluate_model(y_test, y_pred_with_prune)
    
    print("\nTest set metrics (With Pruning):")
    for metric, value in test_metrics_with_prune.items():
        print(f"{metric}: {value:.4f}")
    
    # Feature importance analysis
    print("\nFeature Importance Analysis:")
    print("=" * 80)
    
    feature_importance_dict = {
        f"Feature {i}": importance 
        for i, importance in enumerate(model_no_prune.feature_importances_)
    }
    print("\nTop 5 most important features:")
    sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)[:5]
    for feature, importance in sorted_features:
        print(f"{feature}: {importance:.4f}")

    return model_no_prune, model_with_prune, grid_search_results, test_metrics_no_prune, test_metrics_with_prune



if __name__ == "__main__":
    primary_path = "C:/Users/ikry/Downloads/secondary+mushroom+dataset/MushroomDataset/MushroomDataset/primary_data.csv"
    secondary_path = "C:/Users/ikry/Downloads/secondary+mushroom+dataset/MushroomDataset/MushroomDataset/secondary_data.csv"

X_single, y_single, X_merged, y_merged = data_preprocessing(primary_path, secondary_path, sample_ratio=0.1)
X_train, X_test, y_train, y_test = train_test_split(X_merged, y_merged, test_size=0.2, random_state=42)

    
model_no_prune, model_with_prune, grid_results, metrics_no_prune, metrics_with_prune = run_mushroom_classification(X_merged, y_merged)




Results for All Configurations (Sorted by 0-1 Loss):

1. Validation 0-1 Loss: 0.0946
   Training 0-1 Loss: 0.0784
Parameters:
  - criterion: gini
  - max_depth: 10
  - min_samples_leaf: 1
  - min_impurity_decrease: 0.0
  - ccp_alpha: 0.1
  - prune: True
----------------------------------------

2. Validation 0-1 Loss: 0.2181
   Training 0-1 Loss: 0.2159
Parameters:
  - criterion: entropy
  - max_depth: 7
  - min_samples_leaf: 1
  - min_impurity_decrease: 0.0
  - ccp_alpha: 0.0
  - prune: False
----------------------------------------

3. Validation 0-1 Loss: 0.2371
   Training 0-1 Loss: 0.2495
Parameters:
  - criterion: misclassification
  - max_depth: 5
  - min_samples_leaf: 5
  - min_impurity_decrease: 0.0
  - ccp_alpha: 0.0
  - prune: False
----------------------------------------

4. Validation 0-1 Loss: 0.2380
   Training 0-1 Loss: 0.2527
Parameters:
  - criterion: gini
  - max_depth: 5
  - min_samples_leaf: 1
  - min_impurity_decrease: 0.0
  - ccp_alpha: 0.0
  - prune: False
---