In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                           balanced_accuracy_score, confusion_matrix, 
                           matthews_corrcoef, cohen_kappa_score)
from sklearn.impute import SimpleImputer
from sklearn.tree import DecisionTreeClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
import time
import os
import warnings
warnings.filterwarnings('ignore')
from imblearn.over_sampling import ADASYN

# Create directories for output
os.makedirs("confusion_matrices", exist_ok=True)
os.makedirs("visualizations", exist_ok=True)
os.makedirs("results", exist_ok=True)
os.makedirs("rules", exist_ok=True)

# Set the number of K folds
K_FOLDS = 2

# Helper function for confusion matrix metrics
def confusion_matrix_metrics(cm, classes):
    metrics = {}
    for idx, class_label in enumerate(classes):
        TP = cm[idx, idx]  # True Positives for this class
        FP = cm[:, idx].sum() - TP  # False Positives for this class
        FN = cm[idx, :].sum() - TP  # False Negatives for this class
        TN = cm.sum() - (TP + FP + FN)  # True Negatives for this class

        metrics[class_label] = {
            'TPR': TP / (TP + FN + 1e-10) if (TP + FN) > 0 else 0,
            'TNR': TN / (TN + FP + 1e-10) if (TN + FP) > 0 else 0,
            'FPR': FP / (FP + TN + 1e-10) if (FP + TN) > 0 else 0,
            'FNR': FN / (FN + TP + 1e-10) if (FN + TP) > 0 else 0
        }
    return metrics

# Custom Ridor-like Classifier implementation using Decision Trees
class RidorLikeClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, max_rules=6, min_samples_leaf=2, max_depth=4, random_state=42):
        self.max_rules = max_rules
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.random_state = random_state
        self.rules = {}
        self.default_class = None
        self.classes_ = None
        self.rule_trees = []
        self.rule_texts = []
        
    def fit(self, X, y):
        # Store classes
        self.classes_ = np.unique(y)
        
        # Find the majority class (default class)
        counts = np.bincount(y) if np.issubdtype(y.dtype, np.integer) else np.bincount([np.where(self.classes_ == c)[0][0] for c in y])
        self.default_class = np.argmax(counts)
        
        # For each class (except default), create exception rules
        X_curr = X.copy()
        y_curr = y.copy()
        
        # Keep track of which samples are covered by rules
        covered_indices = np.zeros(len(y), dtype=bool)
        
        for class_idx, class_label in enumerate(self.classes_):
            if class_idx == self.default_class:
                continue
                
            # Create binary classification problem for this class vs. others
            y_binary = np.where(y_curr == class_label, 1, 0)
            
            # If enough samples of this class remain
            if np.sum(y_binary) >= self.min_samples_leaf:
                # Create a decision tree to find rules
                tree = DecisionTreeClassifier(
                    max_depth=self.max_depth,
                    min_samples_leaf=self.min_samples_leaf,
                    random_state=self.random_state
                )
                
                tree.fit(X_curr, y_binary)
                self.rule_trees.append((class_label, tree))
                
                # Generate human-readable rules for this class
                rule_text = f"Class {class_label}:\n"
                rule_text += self._extract_rules_from_tree(tree, X.columns if hasattr(X, 'columns') else range(X.shape[1]))
                self.rule_texts.append(rule_text)
                
                # Find samples covered by these rules
                predictions = tree.predict(X_curr)
                newly_covered = np.where((predictions == 1) & (y_binary == 1))[0]
                
                # Mark these samples as covered
                if len(newly_covered) > 0:
                    covered_indices[newly_covered] = True
        
        # Save final rule set
        for idx, rule_text in enumerate(self.rule_texts):
            with open(f"rules/ridor_like_rules_{idx}.txt", "w") as f:
                f.write(rule_text)
        
        return self
    
    def _extract_rules_from_tree(self, tree, feature_names):
        """Extract human-readable rules from a decision tree"""
        tree_ = tree.tree_
        feature_name = [
            feature_names[i] if i != -2 else "undefined!"
            for i in tree_.feature
        ]
        
        rules = []
        
        def recurse(node, depth, path):
            if tree_.feature[node] != -2:  # Not a leaf
                name = feature_name[node]
                threshold = tree_.threshold[node]
                
                # Left branch (<=)
                recurse(tree_.children_left[node], depth + 1, 
                        path + [f"{name} <= {threshold:.4f}"])
                
                # Right branch (>)
                recurse(tree_.children_right[node], depth + 1,
                        path + [f"{name} > {threshold:.4f}"])
            else:  # Leaf node
                if tree_.value[node][0][1] > tree_.value[node][0][0]:  # More positive than negative
                    coverage = f"{int(tree_.value[node][0][1])}/{int(tree_.value[node][0].sum())}"
                    rule = "  IF " + " AND ".join(path) + f" THEN class ({coverage})"
                    rules.append(rule)
        
        recurse(0, 1, [])
        return "\n".join(rules) + "\n"
    
    def predict(self, X):
        """Predict class for X."""
        if not hasattr(self, 'rule_trees') or len(self.rule_trees) == 0:
            return np.full(X.shape[0], self.default_class)
        
        # Initialize with default class
        predictions = np.full(X.shape[0], self.classes_[self.default_class])
        
        # Apply rules in order
        for class_label, tree in self.rule_trees:
            # Predict which samples match this rule
            rule_predictions = tree.predict(X)
            
            # Update predictions where rule applies
            mask = (rule_predictions == 1)
            predictions[mask] = class_label
            
        return predictions

# Function for exploratory data analysis
def perform_eda(df):
    print("Performing Exploratory Data Analysis...")
      
    
    
    # 1. Create histograms for numeric features
    numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
    if len(numeric_cols) > 0:
        # Create histograms for each numeric feature
        df[numeric_cols].hist(figsize=(15, 15), bins=20, layout=(5, 5))
        plt.suptitle('Histograms of Numeric Features', y=0.92)
        plt.tight_layout()
        plt.savefig("visualizations/histograms.png")
        plt.close()
    
    # 2. Create boxplots to visualize the distribution by class
    fig, axes = plt.subplots(nrows=len(numeric_cols[:5]), figsize=(12, 15))
    for i, feature in enumerate(numeric_cols[:5]):  # Limit to first 5 features for clarity
        sns.boxplot(x='label', y=feature, data=df, ax=axes[i])
        axes[i].set_title(f'Distribution of {feature} by Class')
    plt.tight_layout()
    plt.savefig("visualizations/box_plots_by_class.png")
    plt.close()
    
    # 3. Create pair plot for visualizing relationships between features
    
    # Select a subset of numeric columns (first 5) to avoid overloading the pair plot
    plot_columns = list(numeric_cols[:5])
    # Add the target variable to the plot
    plot_columns.append('label')
    # Create the pair plot
    pair_plot = sns.pairplot(df[plot_columns], hue='label', diag_kind='kde', 
                          plot_kws={'alpha': 0.6, 's': 30, 'edgecolor': 'k', 'linewidth': 0.2})
    pair_plot.fig.suptitle('Pair Plot of Features by Class', y=1.02)
    plt.tight_layout()
    plt.savefig("visualizations/pair_plot.png")
    plt.close()
    

    # 4. Create violin plots for distribution comparison
    plt.figure(figsize=(14, 10))
    for i, feature in enumerate(numeric_cols[:4]):  # First 4 numeric features
        plt.subplot(2, 2, i+1)
        sns.violinplot(x='label', y=feature, data=df)
        plt.title(f'Violin Plot of {feature} by Class')
    plt.tight_layout()
    plt.savefig("visualizations/violin_plots.png")
    plt.close()
    
    print("EDA visualizations created successfully!")
    return df  # Return potentially modified dataframe


# Main processing function
def main():
    # Load dataset
    print("Loading dataset...")
    try:
        df = pd.read_csv('C:/Users/ddihora1604/Downloads/IIT Patna/Darshan_Dihora_ID_17_Task_2/Dataset 1/Student_performance_data.csv')
        print(f"Dataset loaded successfully with shape: {df.shape}")
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return
    
    # Take 20% of the data for faster processing (if needed)
    df = df.sample(frac=0.2, random_state=42)
    print(f"Sampled dataset shape: {df.shape}")
    
    # Rename the last column as 'label' if not already named
    df.rename(columns={df.columns[-1]: 'label'}, inplace=True)
    
    # Add EDA step here
    df = perform_eda(df)
    
    # Data preprocessing
    print("Preprocessing data...")
    
    # Handle missing values
    imputer = SimpleImputer(strategy='mean')
    X = df.drop(columns=['label'])
    y = df['label'].values
    X_columns = X.columns
    X = pd.DataFrame(imputer.fit_transform(X), columns=X_columns)

    # Add visualization of class distribution before balancing
    plt.figure(figsize=(10, 6))
    class_counts_before = pd.Series(y).value_counts().sort_index()
    ax = sns.barplot(x=class_counts_before.index, y=class_counts_before.values)
    plt.title("Class Distribution Before ADASYN")
    plt.xlabel("Class")
    plt.ylabel("Count")
    plt.xticks(rotation=45)
    for i, v in enumerate(class_counts_before.values):
        ax.text(i, v + 5, str(v), ha='center')
    plt.tight_layout()
    plt.savefig("visualizations/class_distribution_before_adasyn.png")
    plt.close()
        
    # Initialize results storage
    results = []
    timing_results = []
    rules_list = []
    
    # Set up KFold cross-validation
    kf = KFold(n_splits=K_FOLDS, shuffle=True, random_state=42)
    
    # Cross-validation process
    print(f"Starting {K_FOLDS}-fold cross-validation with Ridor-like Classifier...")

    for fold_idx, (train_index, test_index) in enumerate(kf.split(X), 1):
        print(f"Processing fold {fold_idx}/{K_FOLDS}...")
        
        # Split the data for this fold
        X_train, X_test = X.iloc[train_index].reset_index(drop=True), X.iloc[test_index].reset_index(drop=True)
        y_train, y_test = y[train_index], y[test_index]
        
        # Apply ADASYN SMOTE to balance classes in the training set
        print(f"Applying ADASYN for class balancing in fold {fold_idx}...")
        try:
            adasyn = ADASYN(random_state=42, n_neighbors=5)
            X_train_resampled, y_train_resampled = adasyn.fit_resample(X_train, y_train)
            
            # Visualize class distribution after ADASYN
            plt.figure(figsize=(10, 6))
            class_counts_after = pd.Series(y_train_resampled).value_counts().sort_index()
            ax = sns.barplot(x=class_counts_after.index, y=class_counts_after.values)
            plt.title(f"Class Distribution After ADASYN (Fold {fold_idx})")
            plt.xlabel("Class")
            plt.ylabel("Count")
            plt.xticks(rotation=45)
            for i, v in enumerate(class_counts_after.values):
                ax.text(i, v + 5, str(v), ha='center')
            plt.tight_layout()
            plt.savefig(f"visualizations/class_distribution_after_adasyn_fold_{fold_idx}.png")
            plt.close()
            
            print(f"Original training set shape: {X_train.shape}, Resampled: {X_train_resampled.shape}")
        except Exception as e:
            print(f"ADASYN error: {e}. Using original imbalanced data.")
            X_train_resampled, y_train_resampled = X_train, y_train
        
        # Train Ridor-like classifier with balanced data
        ridor = RidorLikeClassifier(
            max_rules=6,
            min_samples_leaf=2,
            max_depth=4,
            random_state=42
        )
        
        # Record training time
        start_train_time = time.time()
        ridor.fit(X_train_resampled, y_train_resampled)  # Use resampled data here
        train_time = time.time() - start_train_time
        
        # Get the rules as text
        rules_text = "\n".join(ridor.rule_texts)
        rules_list.append({"Fold": fold_idx, "Rules": rules_text})
        
        # Save rules to file
        with open(f"rules/fold_{fold_idx}.txt", "w") as f:
            f.write(rules_text)
        
        # Record prediction time
        start_test_time = time.time()
        y_pred = ridor.predict(X_test)
        test_time = time.time() - start_test_time
        
        # Record timing info
        timing_results.append({
            'Classifier': 'Ridor-like',
            'Fold': fold_idx,
            'Training Time (s)': train_time,
            'Testing Time (s)': test_time,
            'Total Time (s)': train_time + test_time
        })
        
        # Get unique classes
        unique_classes = np.unique(np.concatenate([y_test, y_pred]))
        
        # Compute confusion matrix
        cm = confusion_matrix(y_test, y_pred, labels=unique_classes)
        cm_metrics = confusion_matrix_metrics(cm, unique_classes)
        
        # Calculate metrics for each class
        for class_label in unique_classes:
            # Binary classification metrics for this class
            y_test_binary = np.array([1 if y == class_label else 0 for y in y_test])
            y_pred_binary = np.array([1 if y == class_label else 0 for y in y_pred])
            
            # Calculate metrics
            class_metrics = {
                'Classifier': 'Ridor-like',
                'Fold': fold_idx,
                'Class': class_label,
                'Accuracy': accuracy_score(y_test_binary, y_pred_binary),
                'Precision': precision_score(y_test_binary, y_pred_binary, zero_division=0),
                'Recall': recall_score(y_test_binary, y_pred_binary),
                'F1 Score': f1_score(y_test_binary, y_pred_binary),
                'Balanced Accuracy': balanced_accuracy_score(y_test_binary, y_pred_binary),
                'Matthews Correlation Coefficient': matthews_corrcoef(y_test_binary, y_pred_binary),
                'Cohen Kappa Score': cohen_kappa_score(y_test_binary, y_pred_binary),
                'True Positive Rate (TPR)': cm_metrics[class_label]['TPR'],
                'True Negative Rate (TNR)': cm_metrics[class_label]['TNR'],
                'False Positive Rate (FPR)': cm_metrics[class_label]['FPR'],
                'False Negative Rate (FNR)': cm_metrics[class_label]['FNR'],
                'Training Time (s)': train_time,
                'Testing Time (s)': test_time
            }
            
            results.append(class_metrics)
        
        # Plot and save confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=unique_classes, yticklabels=unique_classes)
        plt.title(f"Ridor-like Classifier - Fold {fold_idx} Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.tight_layout()
        plt.savefig(f"confusion_matrices/fold_{fold_idx}.png")
        plt.close()
        
        # # Visualize class distribution in test set
        # plt.figure(figsize=(10, 6))
        # class_counts = pd.Series(y_test).value_counts().sort_index()
        # sns.barplot(x=class_counts.index, y=class_counts.values)
        # plt.title(f"Class Distribution in Test Set (Fold {fold_idx})")
        # plt.xlabel("Class")
        # plt.ylabel("Count")
        # plt.xticks(rotation=45)
        # plt.tight_layout()
        # plt.savefig(f"visualizations/class_distribution_fold_{fold_idx}.png")
        # plt.close()
        
        # Feature importance for this fold's model
        if hasattr(ridor, 'rule_trees') and len(ridor.rule_trees) > 0:
            # Collect feature importance from all trees
            feature_importance = np.zeros(len(X_columns))
            for _, tree in ridor.rule_trees:
                if hasattr(tree, 'feature_importances_'):
                    feature_importance += tree.feature_importances_
            
            if np.sum(feature_importance) > 0:
                # Normalize
                feature_importance = feature_importance / len(ridor.rule_trees)
                
                # Create feature importance plot
                plt.figure(figsize=(12, 6))
                importance_df = pd.DataFrame({
                    'Feature': X_columns,
                    'Importance': feature_importance
                }).sort_values('Importance', ascending=False)
                
                sns.barplot(x='Importance', y='Feature', data=importance_df[:15])  # Top 15 features
                plt.title(f'Feature Importance (Fold {fold_idx})')
                plt.tight_layout()
                plt.savefig(f"visualizations/feature_importance_fold_{fold_idx}.png")
                plt.close()
    
    # Create DataFrames for results and save to CSV
    timing_df = pd.DataFrame(timing_results)
    timing_df.to_csv("results/timing.csv", index=False)
    
    results_df = pd.DataFrame(results)
    print("Classification Metrics Across Folds:")
    print(results_df.head())
    
    # Save results to CSV
    results_df.to_csv("results/metrics.csv", index=False)
    
    # Save rules to CSV
    rules_df = pd.DataFrame(rules_list)
    rules_df.to_csv("results/rules.csv", index=False)
    
    # Calculate and display average metrics across folds
    avg_metrics = results_df.groupby(['Classifier', 'Class']).mean().reset_index()
    avg_metrics.to_csv("results/avg_metrics.csv", index=False)
    print("\nAverage Metrics Across Folds:")
    print(avg_metrics[['Classifier', 'Class', 'Accuracy', 'Precision', 'Recall', 'F1 Score']])
    
    
    print("\nRidor-like Classifier implementation completed successfully!")

if __name__ == "__main__":
    main()

Loading dataset...
Dataset loaded successfully with shape: (2392, 15)
Sampled dataset shape: (478, 15)
Performing Exploratory Data Analysis...
EDA visualizations created successfully!
Preprocessing data...
Starting 2-fold cross-validation with Ridor-like Classifier...
Processing fold 1/2...
Applying ADASYN for class balancing in fold 1...
Original training set shape: (239, 14), Resampled: (608, 14)
Processing fold 2/2...
Applying ADASYN for class balancing in fold 2...
Original training set shape: (239, 14), Resampled: (572, 14)
Classification Metrics Across Folds:
   Classifier  Fold  Class  Accuracy  Precision    Recall  F1 Score  \
0  Ridor-like     1    0.0  0.941423   0.400000  0.333333  0.363636   
1  Ridor-like     1    1.0  0.949791   0.928571  0.541667  0.684211   
2  Ridor-like     1    2.0  0.882845   0.630769  0.911111  0.745455   
3  Ridor-like     1    3.0  0.945607   0.829787  0.886364  0.857143   
4  Ridor-like     1    4.0  0.945607   0.990291  0.894737  0.940092   

 