# Safety-Aware Software Defect Prediction Framework
## GWO-Optimized KAN with SMOTE on NASA MDP Datasets

**Objective:** Maximize Recall (Safety) using:
- SMOTE for imbalance handling
- Grey Wolf Optimizer (GWO) for hyperparameter tuning
- Kolmogorov-Arnold Networks (KAN) for classification

In [None]:
# ============================================================================
# IMPORTS
# ============================================================================
import os
import numpy as np
import pandas as pd
import warnings
from pathlib import Path
from typing import Tuple, Dict, List

# Scientific Computing
from scipy.io import arff

# Machine Learning
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, 
    f1_score, roc_auc_score, confusion_matrix
)
from imblearn.over_sampling import SMOTE

# Deep Learning
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(RANDOM_SEED)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# ============================================================================
# CUSTOM KAN IMPLEMENTATION (PyTorch)
# ============================================================================

class KANLinear(nn.Module):
    """
    Kolmogorov-Arnold Network Linear Layer.
    Uses B-spline basis functions for learnable non-linear transformations.
    """
    def __init__(self, in_features, out_features, grid_size=5, spline_order=3):
        super(KANLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.grid_size = grid_size
        self.spline_order = spline_order
        
        # Number of spline basis functions
        self.num_bases = grid_size + spline_order
        
        # Learnable coefficients for each input-output connection
        self.coefficients = nn.Parameter(
            torch.randn(in_features, out_features, self.num_bases) * 0.1
        )
        
        # Grid points for spline evaluation (learnable)
        grid = torch.linspace(-1, 1, grid_size)
        self.register_buffer('grid', grid)
        
    def b_spline_basis(self, x, i, k):
        """
        Compute B-spline basis function using Cox-de Boor recursion.
        """
        if k == 0:
            return ((self.grid[i] <= x) & (x < self.grid[i + 1])).float()
        else:
            # Avoid division by zero
            left_num = x - self.grid[i]
            left_den = self.grid[i + k] - self.grid[i] + 1e-8
            left = left_num / left_den * self.b_spline_basis(x, i, k - 1)
            
            right_num = self.grid[i + k + 1] - x
            right_den = self.grid[i + k + 1] - self.grid[i + 1] + 1e-8
            right = right_num / right_den * self.b_spline_basis(x, i + 1, k - 1)
            
            return left + right
    
    def forward(self, x):
        """
        Forward pass: Apply spline-based transformation.
        x: (batch_size, in_features)
        output: (batch_size, out_features)
        """
        batch_size = x.shape[0]
        
        # Normalize input to [-1, 1] range
        x_normalized = torch.tanh(x)
        
        # Compute basis functions for each input (simplified approximation)
        # Using Gaussian RBF as a practical spline approximation
        x_expanded = x_normalized.unsqueeze(-1)  # (batch, in_features, 1)
        grid_expanded = self.grid.view(1, 1, -1)  # (1, 1, grid_size)
        
        # RBF kernel centered at grid points
        sigma = 2.0 / self.grid_size
        bases = torch.exp(-((x_expanded - grid_expanded) ** 2) / (2 * sigma ** 2))
        
        # Pad to match num_bases if needed
        if bases.shape[-1] < self.num_bases:
            padding = self.num_bases - bases.shape[-1]
            bases = torch.cat([bases, torch.zeros_like(bases[..., :padding])], dim=-1)
        else:
            bases = bases[..., :self.num_bases]
        
        # Apply learnable coefficients
        # bases: (batch, in_features, num_bases)
        # coefficients: (in_features, out_features, num_bases)
        output = torch.einsum('bin,ion->bo', bases, self.coefficients)
        
        return output


class KAN(nn.Module):
    """
    Kolmogorov-Arnold Network for Binary Classification.
    """
    def __init__(self, input_dim, hidden_dim=64, grid_size=5, spline_order=3, dropout=0.2):
        super(KAN, self).__init__()
        
        self.layer1 = KANLinear(input_dim, hidden_dim, grid_size, spline_order)
        self.layer2 = KANLinear(hidden_dim, hidden_dim // 2, grid_size, spline_order)
        self.layer3 = KANLinear(hidden_dim // 2, 1, grid_size, spline_order)
        
        self.dropout = nn.Dropout(dropout)
        self.activation = nn.SiLU()  # Swish activation
        
    def forward(self, x):
        x = self.activation(self.layer1(x))
        x = self.dropout(x)
        x = self.activation(self.layer2(x))
        x = self.dropout(x)
        x = self.layer3(x)
        return torch.sigmoid(x)


print("‚úì KAN Architecture Defined")

In [None]:
# ============================================================================
# GREY WOLF OPTIMIZER (GWO)
# ============================================================================

class GreyWolfOptimizer:
    """
    Grey Wolf Optimizer for hyperparameter optimization.
    Maximizes Recall (safety metric) for defect prediction.
    """
    def __init__(self, objective_func, bounds, n_wolves=10, max_iter=20, verbose=True):
        """
        Args:
            objective_func: Function to maximize (returns fitness score)
            bounds: List of (min, max) tuples for each parameter
            n_wolves: Population size
            max_iter: Maximum iterations
        """
        self.objective_func = objective_func
        self.bounds = np.array(bounds)
        self.n_wolves = n_wolves
        self.max_iter = max_iter
        self.verbose = verbose
        self.n_dims = len(bounds)
        
        # Initialize wolf positions randomly within bounds
        self.positions = np.random.uniform(
            self.bounds[:, 0], 
            self.bounds[:, 1], 
            (n_wolves, self.n_dims)
        )
        
        # Initialize fitness
        self.fitness = np.zeros(n_wolves)
        
        # Alpha, Beta, Delta positions and fitness
        self.alpha_pos = np.zeros(self.n_dims)
        self.alpha_fitness = -np.inf
        
        self.beta_pos = np.zeros(self.n_dims)
        self.beta_fitness = -np.inf
        
        self.delta_pos = np.zeros(self.n_dims)
        self.delta_fitness = -np.inf
        
    def optimize(self):
        """
        Run the GWO optimization algorithm.
        Returns: Best parameters found
        """
        for iteration in range(self.max_iter):
            # Evaluate fitness for all wolves
            for i in range(self.n_wolves):
                self.fitness[i] = self.objective_func(self.positions[i])
                
                # Update Alpha, Beta, Delta
                if self.fitness[i] > self.alpha_fitness:
                    self.delta_fitness = self.beta_fitness
                    self.delta_pos = self.beta_pos.copy()
                    
                    self.beta_fitness = self.alpha_fitness
                    self.beta_pos = self.alpha_pos.copy()
                    
                    self.alpha_fitness = self.fitness[i]
                    self.alpha_pos = self.positions[i].copy()
                    
                elif self.fitness[i] > self.beta_fitness:
                    self.delta_fitness = self.beta_fitness
                    self.delta_pos = self.beta_pos.copy()
                    
                    self.beta_fitness = self.fitness[i]
                    self.beta_pos = self.positions[i].copy()
                    
                elif self.fitness[i] > self.delta_fitness:
                    self.delta_fitness = self.fitness[i]
                    self.delta_pos = self.positions[i].copy()
            
            # Linearly decrease 'a' from 2 to 0
            a = 2 - iteration * (2 / self.max_iter)
            
            # Update wolf positions
            for i in range(self.n_wolves):
                for j in range(self.n_dims):
                    # Alpha influence
                    r1, r2 = np.random.rand(2)
                    A1 = 2 * a * r1 - a
                    C1 = 2 * r2
                    D_alpha = abs(C1 * self.alpha_pos[j] - self.positions[i, j])
                    X1 = self.alpha_pos[j] - A1 * D_alpha
                    
                    # Beta influence
                    r1, r2 = np.random.rand(2)
                    A2 = 2 * a * r1 - a
                    C2 = 2 * r2
                    D_beta = abs(C2 * self.beta_pos[j] - self.positions[i, j])
                    X2 = self.beta_pos[j] - A2 * D_beta
                    
                    # Delta influence
                    r1, r2 = np.random.rand(2)
                    A3 = 2 * a * r1 - a
                    C3 = 2 * r2
                    D_delta = abs(C3 * self.delta_pos[j] - self.positions[i, j])
                    X3 = self.delta_pos[j] - A3 * D_delta
                    
                    # Update position
                    self.positions[i, j] = (X1 + X2 + X3) / 3
                    
                    # Boundary check
                    self.positions[i, j] = np.clip(
                        self.positions[i, j],
                        self.bounds[j, 0],
                        self.bounds[j, 1]
                    )
            
            if self.verbose:
                print(f"  Iter {iteration+1}/{self.max_iter} | "
                      f"Alpha Fitness (Recall): {self.alpha_fitness:.4f} | "
                      f"Beta: {self.beta_fitness:.4f} | "
                      f"Delta: {self.delta_fitness:.4f}")
        
        return self.alpha_pos, self.alpha_fitness


print("‚úì Grey Wolf Optimizer Implemented")

In [None]:
# ============================================================================
# DATA LOADER & PREPROCESSING
# ============================================================================

class NASADataLoader:
    """
    Handles loading and preprocessing of NASA MDP datasets (.arff format).
    """
    def __init__(self, dataset_dir='./dataset/'):
        self.dataset_dir = Path(dataset_dir)
        self.scaler = MinMaxScaler()
        self.label_encoder = LabelEncoder()
        
    def load_arff(self, file_path):
        """
        Load .arff file and convert to pandas DataFrame.
        Handles byte-string decoding issues.
        """
        try:
            data, meta = arff.loadarff(file_path)
            df = pd.DataFrame(data)
            
            # Decode byte strings if present
            for col in df.columns:
                if df[col].dtype == object:
                    try:
                        df[col] = df[col].str.decode('utf-8')
                    except:
                        pass
            
            return df
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
            return None
    
    def preprocess(self, df, target_col='Defective'):
        """
        Preprocess dataset:
        1. Separate features and target
        2. Handle missing values
        3. Encode labels
        4. Normalize features
        """
        # Handle common target column names
        possible_targets = ['Defective', 'defects', 'bug', 'Defect', 'class']
        target_col = None
        for col in possible_targets:
            if col in df.columns:
                target_col = col
                break
        
        if target_col is None:
            # Assume last column is target
            target_col = df.columns[-1]
        
        # Separate features and target
        X = df.drop(columns=[target_col])
        y = df[target_col]
        
        # Drop non-numeric columns (e.g., module names)
        X = X.select_dtypes(include=[np.number])
        
        # Handle missing values
        X = X.fillna(X.mean())
        
        # Encode labels (True/False or Y/N to 1/0)
        if y.dtype == object or y.dtype == bool:
            y = self.label_encoder.fit_transform(y)
        
        # Ensure binary classification
        y = y.astype(int)
        
        return X.values, y
    
    def apply_smote(self, X_train, y_train, random_state=42):
        """
        Apply SMOTE to training data ONLY.
        Prevents data leakage by not touching test data.
        """
        # Check if minority class has enough samples
        unique, counts = np.unique(y_train, return_counts=True)
        min_samples = min(counts)
        
        # SMOTE requires at least 2 samples in minority class
        if min_samples < 2:
            print("  Warning: Not enough minority samples for SMOTE. Skipping.")
            return X_train, y_train
        
        # Determine k_neighbors (must be less than minority samples)
        k_neighbors = min(5, min_samples - 1)
        
        try:
            smote = SMOTE(random_state=random_state, k_neighbors=k_neighbors)
            X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
            print(f"  SMOTE applied: {len(y_train)} -> {len(y_resampled)} samples")
            return X_resampled, y_resampled
        except Exception as e:
            print(f"  SMOTE failed: {e}. Using original data.")
            return X_train, y_train
    
    def get_dataset_files(self):
        """
        Get all .arff files from dataset directory.
        """
        return list(self.dataset_dir.glob('*.arff'))


print("‚úì Data Loader Implemented")

In [None]:
# ============================================================================
# TRAINING & EVALUATION FUNCTIONS
# ============================================================================

def train_kan(model, X_train, y_train, epochs=50, lr=0.01, batch_size=32, verbose=False):
    """
    Train KAN model.
    """
    model.train()
    
    # Convert to tensors
    X_tensor = torch.FloatTensor(X_train).to(device)
    y_tensor = torch.FloatTensor(y_train).reshape(-1, 1).to(device)
    
    # DataLoader
    dataset = TensorDataset(X_tensor, y_tensor)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Optimizer and Loss
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    criterion = nn.BCELoss()
    
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_X, batch_y in loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        
        if verbose and (epoch + 1) % 10 == 0:
            print(f"    Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(loader):.4f}")
    
    return model


def evaluate_kan(model, X_test, y_test):
    """
    Evaluate KAN model and return metrics.
    """
    model.eval()
    
    with torch.no_grad():
        X_tensor = torch.FloatTensor(X_test).to(device)
        y_pred_prob = model(X_tensor).cpu().numpy()
        y_pred = (y_pred_prob > 0.5).astype(int).flatten()
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, zero_division=0)
    recall = recall_score(y_test, y_pred, zero_division=0)
    f1 = f1_score(y_test, y_pred, zero_division=0)
    
    # AUC (requires probabilities)
    try:
        auc = roc_auc_score(y_test, y_pred_prob)
    except:
        auc = 0.0
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': auc
    }


print("‚úì Training & Evaluation Functions Ready")

In [None]:
# ============================================================================
# MAIN PIPELINE: GWO-OPTIMIZED KAN WITH SMOTE
# ============================================================================

class SafetyAwareDefectPredictor:
    """
    Complete pipeline for safety-aware defect prediction.
    """
    def __init__(self, dataset_dir='./dataset/'):
        self.data_loader = NASADataLoader(dataset_dir)
        self.results = []
        
    def create_objective_function(self, X_train, y_train, X_val, y_val, input_dim):
        """
        Create objective function for GWO.
        Maximizes Recall on validation set.
        """
        def objective(params):
            # Decode parameters
            grid_size = int(params[0])
            spline_order = int(params[1])
            hidden_dim = int(params[2])
            lr = params[3]
            
            # Create and train model
            model = KAN(
                input_dim=input_dim,
                hidden_dim=hidden_dim,
                grid_size=grid_size,
                spline_order=spline_order
            ).to(device)
            
            # Train for fewer epochs during optimization
            model = train_kan(model, X_train, y_train, epochs=30, lr=lr, verbose=False)
            
            # Evaluate on validation set
            metrics = evaluate_kan(model, X_val, y_val)
            
            # Return Recall (our safety metric)
            # We could also use F2-score which weights recall higher: (5 * p * r) / (4 * p + r)
            return metrics['recall']
        
        return objective
    
    def process_dataset(self, dataset_path):
        """
        Process a single dataset through the complete pipeline.
        """
        dataset_name = dataset_path.stem
        print(f"\n{'='*80}")
        print(f"Processing: {dataset_name}")
        print(f"{'='*80}")
        
        # 1. Load Data
        print("[1/6] Loading dataset...")
        df = self.data_loader.load_arff(dataset_path)
        if df is None:
            return None
        
        X, y = self.data_loader.preprocess(df)
        print(f"  Shape: {X.shape}, Defect Rate: {y.mean():.2%}")
        
        # 2. Train/Test Split (Stratified)
        print("[2/6] Splitting data (80/20)...")
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y
        )
        
        # 3. Apply SMOTE (Training Only)
        print("[3/6] Applying SMOTE...")
        X_train_smote, y_train_smote = self.data_loader.apply_smote(X_train, y_train)
        
        # Normalize features
        scaler = MinMaxScaler()
        X_train_scaled = scaler.fit_transform(X_train_smote)
        X_test_scaled = scaler.transform(X_test)
        
        # Create validation split for GWO
        X_train_gwo, X_val_gwo, y_train_gwo, y_val_gwo = train_test_split(
            X_train_scaled, y_train_smote, test_size=0.2, random_state=RANDOM_SEED
        )
        
        # 4. GWO Hyperparameter Optimization
        print("[4/6] Running GWO for hyperparameter optimization...")
        print("  Optimizing: grid_size, spline_order, hidden_dim, learning_rate")
        print("  Objective: Maximize Recall (Safety Metric)")
        
        # Define search space
        bounds = [
            (3, 10),      # grid_size
            (2, 5),       # spline_order
            (16, 128),    # hidden_dim
            (0.001, 0.1)  # learning_rate
        ]
        
        objective_func = self.create_objective_function(
            X_train_gwo, y_train_gwo, X_val_gwo, y_val_gwo, X.shape[1]
        )
        
        gwo = GreyWolfOptimizer(
            objective_func=objective_func,
            bounds=bounds,
            n_wolves=8,
            max_iter=15,
            verbose=True
        )
        
        best_params, best_fitness = gwo.optimize()
        
        # Decode best parameters
        best_grid_size = int(best_params[0])
        best_spline_order = int(best_params[1])
        best_hidden_dim = int(best_params[2])
        best_lr = best_params[3]
        
        print(f"\n  ‚úì Best Parameters Found:")
        print(f"    Grid Size: {best_grid_size}")
        print(f"    Spline Order: {best_spline_order}")
        print(f"    Hidden Dim: {best_hidden_dim}")
        print(f"    Learning Rate: {best_lr:.4f}")
        print(f"    Validation Recall: {best_fitness:.4f}")
        
        # 5. Train Final Model with Best Parameters
        print("\n[5/6] Training final model with optimized parameters...")
        final_model = KAN(
            input_dim=X.shape[1],
            hidden_dim=best_hidden_dim,
            grid_size=best_grid_size,
            spline_order=best_spline_order
        ).to(device)
        
        final_model = train_kan(
            final_model, 
            X_train_scaled, 
            y_train_smote, 
            epochs=100, 
            lr=best_lr,
            verbose=True
        )
        
        # 6. Evaluate on Test Set
        print("\n[6/6] Evaluating on test set...")
        test_metrics = evaluate_kan(final_model, X_test_scaled, y_test)
        
        print(f"\n  üìä Test Results:")
        print(f"    Accuracy:  {test_metrics['accuracy']:.4f}")
        print(f"    Precision: {test_metrics['precision']:.4f}")
        print(f"    Recall:    {test_metrics['recall']:.4f} ‚ö†Ô∏è  (Safety Metric)")
        print(f"    F1-Score:  {test_metrics['f1']:.4f}")
        print(f"    AUC:       {test_metrics['auc']:.4f}")
        
        # Store results
        result = {
            'Dataset': dataset_name,
            'Grid_Size': best_grid_size,
            'Spline_Order': best_spline_order,
            'Hidden_Dim': best_hidden_dim,
            'Learning_Rate': best_lr,
            'Accuracy': test_metrics['accuracy'],
            'Precision': test_metrics['precision'],
            'Recall': test_metrics['recall'],
            'F1_Score': test_metrics['f1'],
            'AUC': test_metrics['auc']
        }
        
        self.results.append(result)
        return result
    
    def run_all_datasets(self):
        """
        Process all datasets in the directory.
        """
        dataset_files = self.data_loader.get_dataset_files()
        
        if not dataset_files:
            print("‚ö†Ô∏è  No .arff files found in ./dataset/ directory!")
            print("Please add NASA MDP datasets (e.g., CM1.arff, JM1.arff, etc.)")
            return
        
        print(f"\nüöÄ Found {len(dataset_files)} datasets to process\n")
        
        for dataset_path in dataset_files:
            try:
                self.process_dataset(dataset_path)
            except Exception as e:
                print(f"\n‚ùå Error processing {dataset_path.name}: {e}")
                continue
        
        # Save results
        self.save_results()
    
    def save_results(self, output_file='final_results.xlsx'):
        """
        Save consolidated results to Excel file.
        """
        if not self.results:
            print("\n‚ö†Ô∏è  No results to save.")
            return
        
        df_results = pd.DataFrame(self.results)
        df_results.to_excel(output_file, index=False, sheet_name='GWO-KAN Results')
        
        print(f"\n{'='*80}")
        print(f"‚úÖ Results saved to: {output_file}")
        print(f"{'='*80}")
        print("\nüìä Summary Statistics:")
        print(df_results[['Dataset', 'Recall', 'F1_Score', 'AUC']].to_string(index=False))
        print(f"\nAverage Recall (Safety): {df_results['Recall'].mean():.4f}")
        print(f"Average F1-Score: {df_results['F1_Score'].mean():.4f}")


print("‚úì Main Pipeline Ready")

In [None]:
# ============================================================================
# EXECUTE PIPELINE
# ============================================================================

# Initialize predictor
predictor = SafetyAwareDefectPredictor(dataset_dir='./dataset/')

# Run on all datasets
predictor.run_all_datasets()

print("\nüéâ Pipeline completed successfully!")