In [2]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

class ImprovedTransformer(nn.Module):
    def __init__(self, input_dim, num_classes=3, d_model=512, nhead=8, num_layers=6, dropout=0.2):
        super(ImprovedTransformer, self).__init__()

        # Input projection
        self.input_projection = nn.Linear(input_dim, d_model)
        self.layer_norm_input = nn.LayerNorm(d_model)

        # Positional encoding
        self.pos_encoding = nn.Parameter(torch.randn(1, 1, d_model))

        # Transformer layers with residual connections
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model*2,
            dropout=dropout,
            batch_first=True,
            norm_first=True  # Pre-norm architecture
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Classification head with multiple layers
        self.classifier = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Dropout(dropout),
            nn.Linear(d_model, d_model // 2),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 2, d_model // 4),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 4, num_classes)
        )

        # Initialize weights
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)

    def forward(self, x):
        # Input projection and normalization
        x = self.input_projection(x)  # (batch_size, input_dim) -> (batch_size, d_model)
        x = self.layer_norm_input(x)

        # Add positional encoding and reshape for transformer
        x = x.unsqueeze(1) + self.pos_encoding  # (batch_size, 1, d_model)

        # Transformer encoding
        x = self.transformer(x)  # (batch_size, 1, d_model)

        # Classification
        x = x.squeeze(1)  # (batch_size, d_model)
        return self.classifier(x)

class HybridEnsemble:
    def __init__(self, input_dim, num_classes=3, device='cpu'):
        self.input_dim = input_dim
        self.num_classes = num_classes
        self.device = device
        self.models = {}
        self.scalers = {}
        self.meta_model = None
        self.label_mapping = None

    def create_xgboost_model(self):
        """Create and return XGBoost model with optimized parameters"""
        return xgb.XGBClassifier(
            n_estimators=200,
            max_depth=8,
            learning_rate=0.1,
            subsample=0.9,
            colsample_bytree=0.9,
            random_state=42,
            eval_metric='mlogloss',
            tree_method='hist'
        )

    def create_lightgbm_model(self):
        """Create and return LightGBM model with optimized parameters"""
        return lgb.LGBMClassifier(
            n_estimators=200,
            max_depth=8,
            learning_rate=0.1,
            subsample=0.9,
            colsample_bytree=0.9,
            random_state=42,
            verbose=-1,
            force_col_wise=True
        )

    def create_transformer_model(self):
        """Create and return improved transformer model"""
        model = ImprovedTransformer(
            input_dim=self.input_dim,
            num_classes=self.num_classes,
            d_model=256,  # Reduced for better generalization
            nhead=8,
            num_layers=4,  # Reduced to prevent overfitting
            dropout=0.3
        ).to(self.device)
        return model

    def train_transformer_with_tuning(self, X_train, y_train, X_val=None, y_val=None, epochs=50):
        """Train transformer with improved techniques"""
        model = self.create_transformer_model()

        # Loss function with label smoothing
        criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

        # AdamW optimizer with weight decay
        optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

        # Cosine annealing scheduler
        scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)

        # Convert to tensors
        X_train_tensor = torch.FloatTensor(X_train).to(self.device)
        y_train_tensor = torch.LongTensor(y_train).to(self.device)

        # Create validation tensors if provided
        if X_val is not None and y_val is not None:
            X_val_tensor = torch.FloatTensor(X_val).to(self.device)
            y_val_tensor = torch.LongTensor(y_val).to(self.device)

        # Create data loaders
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

        best_val_acc = 0
        best_model_state = None
        patience = 10
        patience_counter = 0

        model.train()
        for epoch in range(epochs):
            total_loss = 0
            correct_train = 0
            total_train = 0

            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()

                # Gradient clipping
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

                optimizer.step()
                scheduler.step()

                total_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total_train += batch_y.size(0)
                correct_train += (predicted == batch_y).sum().item()

            # Evaluation
            train_acc = correct_train / total_train
            avg_loss = total_loss / len(train_loader)

            # Validation evaluation if validation data is provided
            if X_val is not None and y_val is not None:
                model.eval()
                with torch.no_grad():
                    val_outputs = model(X_val_tensor)
                    _, val_predicted = torch.max(val_outputs.data, 1)
                    val_acc = (val_predicted == y_val_tensor).sum().item() / y_val_tensor.size(0)

                if val_acc > best_val_acc:
                    best_val_acc = val_acc
                    best_model_state = model.state_dict().copy()
                    patience_counter = 0
                else:
                    patience_counter += 1

                if epoch % 5 == 0:
                    print(f'Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f} - Train Acc: {train_acc:.4f} - Val Acc: {val_acc:.4f}')

                if patience_counter >= patience:
                    print(f'Early stopping at epoch {epoch+1}')
                    break

                model.train()
            else:
                # No validation data, just print training progress
                if epoch % 5 == 0:
                    print(f'Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f} - Train Acc: {train_acc:.4f}')

        # Load best model if validation was used
        if best_model_state is not None:
            model.load_state_dict(best_model_state)
            print(f'Best transformer validation accuracy: {best_val_acc:.4f}')

        return model

    def train_base_models(self, X_train, y_train):
        """Train all base models using only training data"""
        print("Training base models on training dataset...")

        # Create validation split from training data for transformer validation
        from sklearn.model_selection import train_test_split
        X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )

        # Scale features for transformer
        scaler_transformer = StandardScaler()
        X_train_scaled = scaler_transformer.fit_transform(X_train_split)
        X_val_scaled = scaler_transformer.transform(X_val_split)
        self.scalers['transformer'] = scaler_transformer

        # Train XGBoost
        print("Training XGBoost...")
        xgb_model = self.create_xgboost_model()
        xgb_model.fit(X_train, y_train)
        self.models['xgboost'] = xgb_model
        print("XGBoost training completed.")

        # Train LightGBM
        print("Training LightGBM...")
        lgb_model = self.create_lightgbm_model()
        lgb_model.fit(X_train, y_train)
        self.models['lightgbm'] = lgb_model
        print("LightGBM training completed.")

        # Train Transformer
        print("Training Enhanced Transformer...")
        transformer_model = self.train_transformer_with_tuning(
            X_train_scaled, y_train_split, X_val_scaled, y_val_split, epochs=50
        )
        self.models['transformer'] = transformer_model
        print("Enhanced Transformer training completed.")

    def create_stacking_ensemble(self, X_train, y_train):
        """Create stacking ensemble using meta-learner with cross-validation on training data"""
        print("Creating stacking ensemble...")

        # Generate base model predictions using cross-validation on training data
        cv_folds = 5
        skf = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)

        # Initialize meta-features
        meta_features_train = np.zeros((len(X_train), len(self.models)))

        for fold, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train)):
            print(f"Processing fold {fold + 1}/{cv_folds}")

            X_fold_train, X_fold_val = X_train[train_idx], X_train[val_idx]
            y_fold_train, y_fold_val = y_train[train_idx], y_train[val_idx]

            # XGBoost predictions
            fold_xgb = self.create_xgboost_model()
            fold_xgb.fit(X_fold_train, y_fold_train)
            meta_features_train[val_idx, 0] = fold_xgb.predict_proba(X_fold_val)[:, 1] if len(np.unique(y_train)) == 2 else fold_xgb.predict_proba(X_fold_val).max(axis=1)

            # LightGBM predictions
            fold_lgb = self.create_lightgbm_model()
            fold_lgb.fit(X_fold_train, y_fold_train)
            meta_features_train[val_idx, 1] = fold_lgb.predict_proba(X_fold_val)[:, 1] if len(np.unique(y_train)) == 2 else fold_lgb.predict_proba(X_fold_val).max(axis=1)

            # Transformer predictions
            scaler_fold = StandardScaler()
            X_fold_train_scaled = scaler_fold.fit_transform(X_fold_train)
            X_fold_val_scaled = scaler_fold.transform(X_fold_val)

            fold_transformer = self.train_transformer_with_tuning(
                X_fold_train_scaled, y_fold_train, X_fold_val_scaled, y_fold_val, epochs=30
            )
            fold_transformer.eval()
            with torch.no_grad():
                X_val_tensor = torch.FloatTensor(X_fold_val_scaled).to(self.device)
                val_outputs = fold_transformer(X_val_tensor)
                val_probs = torch.softmax(val_outputs, dim=1).cpu().numpy()
                meta_features_train[val_idx, 2] = val_probs.max(axis=1)

        # Train meta-learner
        self.meta_model = LogisticRegression(random_state=42, max_iter=1000)
        self.meta_model.fit(meta_features_train, y_train)

        print("Stacking ensemble training completed.")

    def evaluate_on_test_set(self, X_test, y_test):
        """Evaluate all ensemble methods on the test dataset"""
        print("\n" + "="*60)
        print("EVALUATING ON TEST DATASET")
        print("="*60)

        # Prepare test data for transformer
        X_test_scaled = self.scalers['transformer'].transform(X_test)

        # Get individual model predictions on test set
        print("Getting individual model predictions...")
        xgb_pred = self.models['xgboost'].predict(X_test)
        xgb_acc = accuracy_score(y_test, xgb_pred)
        xgb_f1 = f1_score(y_test, xgb_pred, average='weighted')
        print(f"XGBoost Test - Accuracy: {xgb_acc:.4f}, F1: {xgb_f1:.4f}")

        lgb_pred = self.models['lightgbm'].predict(X_test)
        lgb_acc = accuracy_score(y_test, lgb_pred)
        lgb_f1 = f1_score(y_test, lgb_pred, average='weighted')
        print(f"LightGBM Test - Accuracy: {lgb_acc:.4f}, F1: {lgb_f1:.4f}")

        # Transformer predictions on test set
        self.models['transformer'].eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test_scaled).to(self.device)
            transformer_outputs = self.models['transformer'](X_test_tensor)
            _, transformer_pred = torch.max(transformer_outputs, 1)
            transformer_pred = transformer_pred.cpu().numpy()

        transformer_acc = accuracy_score(y_test, transformer_pred)
        transformer_f1 = f1_score(y_test, transformer_pred, average='weighted')
        print(f"Enhanced Transformer Test - Accuracy: {transformer_acc:.4f}, F1: {transformer_f1:.4f}")

        # Voting ensemble on test set
        voting_pred, voting_acc, voting_f1 = self.create_voting_ensemble(X_test, y_test)

        # Weighted ensemble on test set
        weighted_pred, weighted_acc, weighted_f1 = self.create_weighted_ensemble(X_test, y_test)

        # Stacking ensemble on test set
        stacking_pred, stacking_acc, stacking_f1 = self.evaluate_stacking_on_test(X_test, y_test)

        # Compare results
        results = {
            'Method': ['XGBoost', 'LightGBM', 'Transformer', 'Voting', 'Weighted', 'Stacking'],
            'Accuracy': [xgb_acc, lgb_acc, transformer_acc, voting_acc, weighted_acc, stacking_acc],
            'F1-Score': [xgb_f1, lgb_f1, transformer_f1, voting_f1, weighted_f1, stacking_f1]
        }

        results_df = pd.DataFrame(results)
        print("\nTest Set Results Comparison:")
        print(results_df.to_string(index=False))

        # Find best method
        best_method_idx = np.argmax(results_df['F1-Score'])
        best_method = results_df.iloc[best_method_idx]['Method']
        best_f1 = results_df.iloc[best_method_idx]['F1-Score']

        print(f"\nBest method on test set: {best_method} (F1: {best_f1:.4f})")

        # Detailed classification report for best method
        if best_method == 'Voting':
            best_pred = voting_pred
        elif best_method == 'Weighted':
            best_pred = weighted_pred
        elif best_method == 'Stacking':
            best_pred = stacking_pred
        elif best_method == 'XGBoost':
            best_pred = xgb_pred
        elif best_method == 'LightGBM':
            best_pred = lgb_pred
        else:
            best_pred = transformer_pred

        print(f"\nDetailed Classification Report for {best_method} on Test Set:")
        print(classification_report(y_test, best_pred))

        return results_df

    def evaluate_stacking_on_test(self, X_test, y_test):
        """Evaluate stacking ensemble on test set"""
        print("Evaluating stacking ensemble on test set...")

        X_test_scaled = self.scalers['transformer'].transform(X_test)

        # Generate test meta-features using trained base models
        meta_features_test = np.zeros((len(X_test), len(self.models)))

        # XGBoost test predictions
        xgb_test_probs = self.models['xgboost'].predict_proba(X_test)
        meta_features_test[:, 0] = xgb_test_probs[:, 1] if xgb_test_probs.shape[1] == 2 else xgb_test_probs.max(axis=1)

        # LightGBM test predictions
        lgb_test_probs = self.models['lightgbm'].predict_proba(X_test)
        meta_features_test[:, 1] = lgb_test_probs[:, 1] if lgb_test_probs.shape[1] == 2 else lgb_test_probs.max(axis=1)

        # Transformer test predictions
        self.models['transformer'].eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test_scaled).to(self.device)
            test_outputs = self.models['transformer'](X_test_tensor)
            test_probs = torch.softmax(test_outputs, dim=1).cpu().numpy()
            meta_features_test[:, 2] = test_probs.max(axis=1)

        # Use meta-learner to make final predictions
        stacking_pred = self.meta_model.predict(meta_features_test)
        stacking_acc = accuracy_score(y_test, stacking_pred)
        stacking_f1 = f1_score(y_test, stacking_pred, average='weighted')

        print(f"Stacking Ensemble Test - Accuracy: {stacking_acc:.4f}, F1: {stacking_f1:.4f}")

        return stacking_pred, stacking_acc, stacking_f1

    def create_voting_ensemble(self, X_test, y_test):
        """Create voting ensemble on test set"""
        print("Evaluating voting ensemble on test set...")

        X_test_scaled = self.scalers['transformer'].transform(X_test)

        # Get predictions from all models
        xgb_pred = self.models['xgboost'].predict(X_test)
        lgb_pred = self.models['lightgbm'].predict(X_test)

        # Transformer predictions
        self.models['transformer'].eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test_scaled).to(self.device)
            transformer_outputs = self.models['transformer'](X_test_tensor)
            _, transformer_pred = torch.max(transformer_outputs, 1)
            transformer_pred = transformer_pred.cpu().numpy()

        # Majority voting
        predictions = np.array([xgb_pred, lgb_pred, transformer_pred])
        voting_pred = np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=predictions)

        voting_acc = accuracy_score(y_test, voting_pred)
        voting_f1 = f1_score(y_test, voting_pred, average='weighted')

        print(f"Voting Ensemble Test - Accuracy: {voting_acc:.4f}, F1: {voting_f1:.4f}")

        return voting_pred, voting_acc, voting_f1

    def create_weighted_ensemble(self, X_test, y_test):
        """Create weighted ensemble on test set"""
        print("Evaluating weighted ensemble on test set...")

        X_test_scaled = self.scalers['transformer'].transform(X_test)

        # Get probability predictions from all models
        xgb_probs = self.models['xgboost'].predict_proba(X_test)
        lgb_probs = self.models['lightgbm'].predict_proba(X_test)

        # Transformer probabilities
        self.models['transformer'].eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test_scaled).to(self.device)
            transformer_outputs = self.models['transformer'](X_test_tensor)
            transformer_probs = torch.softmax(transformer_outputs, dim=1).cpu().numpy()

        # Weights based on expected performance (you can adjust these)
        weights = [0.3, 0.4, 0.3]  # Equal weights initially, can be tuned

        # Weighted average
        weighted_probs = (weights[0] * xgb_probs +
                         weights[1] * lgb_probs +
                         weights[2] * transformer_probs)

        weighted_pred = np.argmax(weighted_probs, axis=1)

        weighted_acc = accuracy_score(y_test, weighted_pred)
        weighted_f1 = f1_score(y_test, weighted_pred, average='weighted')

        print(f"Weighted Ensemble Test - Accuracy: {weighted_acc:.4f}, F1: {weighted_f1:.4f}")

        return weighted_pred, weighted_acc, weighted_f1

    def predict(self, X, method='weighted'):
        """Make predictions using specified ensemble method"""
        if method == 'stacking' and self.meta_model is None:
            raise ValueError("Stacking model not trained. Use fit method first.")

        X_scaled = self.scalers['transformer'].transform(X)

        if method == 'voting':
            # Get predictions from all models
            xgb_pred = self.models['xgboost'].predict(X)
            lgb_pred = self.models['lightgbm'].predict(X)

            self.models['transformer'].eval()
            with torch.no_grad():
                X_tensor = torch.FloatTensor(X_scaled).to(self.device)
                transformer_outputs = self.models['transformer'](X_tensor)
                _, transformer_pred = torch.max(transformer_outputs, 1)
                transformer_pred = transformer_pred.cpu().numpy()

            predictions = np.array([xgb_pred, lgb_pred, transformer_pred])
            return np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=predictions)

        elif method == 'weighted':
            xgb_probs = self.models['xgboost'].predict_proba(X)
            lgb_probs = self.models['lightgbm'].predict_proba(X)

            self.models['transformer'].eval()
            with torch.no_grad():
                X_tensor = torch.FloatTensor(X_scaled).to(self.device)
                transformer_outputs = self.models['transformer'](X_tensor)
                transformer_probs = torch.softmax(transformer_outputs, dim=1).cpu().numpy()

            weights = [0.3, 0.4, 0.3]
            weighted_probs = (weights[0] * xgb_probs +
                             weights[1] * lgb_probs +
                             weights[2] * transformer_probs)

            return np.argmax(weighted_probs, axis=1)

        elif method == 'stacking':
            # Generate meta-features
            meta_features = np.zeros((len(X), len(self.models)))

            xgb_probs = self.models['xgboost'].predict_proba(X)
            meta_features[:, 0] = xgb_probs[:, 1] if xgb_probs.shape[1] == 2 else xgb_probs.max(axis=1)

            lgb_probs = self.models['lightgbm'].predict_proba(X)
            meta_features[:, 1] = lgb_probs[:, 1] if lgb_probs.shape[1] == 2 else lgb_probs.max(axis=1)

            self.models['transformer'].eval()
            with torch.no_grad():
                X_tensor = torch.FloatTensor(X_scaled).to(self.device)
                transformer_outputs = self.models['transformer'](X_tensor)
                transformer_probs = torch.softmax(transformer_outputs, dim=1).cpu().numpy()
                meta_features[:, 2] = transformer_probs.max(axis=1)

            return self.meta_model.predict(meta_features)

def main():
    # Load your datasets
    print("Loading training dataset...")
    train_df = pd.read_csv('train_set_fu.csv')  # Replace with your training file path

    print("Loading testing dataset...")
    test_df = pd.read_csv('test_set_fu.csv')   # Replace with your testing file path

    # Prepare training data
    X_train = train_df.drop('label', axis=1).values
    y_train = train_df['label'].values

    # Prepare testing data
    X_test = test_df.drop('label', axis=1).values
    y_test = test_df['label'].values

    # Convert labels to numeric if they're strings
    if y_train.dtype == 'object':
        unique_labels = np.unique(np.concatenate([y_train, y_test]))
        label_mapping = {label: idx for idx, label in enumerate(unique_labels)}
        y_train = np.array([label_mapping[label] for label in y_train])
        y_test = np.array([label_mapping[label] for label in y_test])
        print(f"Label mapping: {label_mapping}")

    print(f"Training set size: {X_train.shape}")
    print(f"Test set size: {X_test.shape}")
    print(f"Number of classes: {len(np.unique(np.concatenate([y_train, y_test])))}")

    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Create hybrid ensemble
    hybrid_model = HybridEnsemble(
        input_dim=X_train.shape[1],
        num_classes=len(np.unique(np.concatenate([y_train, y_test]))),
        device=device
    )

    # Train base models on training data only
    hybrid_model.train_base_models(X_train, y_train)

    # Train stacking ensemble on training data only
    hybrid_model.create_stacking_ensemble(X_train, y_train)

    # Evaluate on test dataset
    results_df = hybrid_model.evaluate_on_test_set(X_test, y_test)

    return hybrid_model, results_df

if __name__ == "__main__":
    hybrid_model, results = main()

    # Example of how to use the trained model for new predictions
    print("\nModel training completed! You can now use:")
    print("- hybrid_model.predict(X_new, method='weighted') for new predictions")
    print("- hybrid_model.predict(X_new, method='voting') for voting ensemble")
    print("- hybrid_model.predict(X_new, method='stacking') for stacking ensemble")

Loading training dataset...
Loading testing dataset...
Label mapping: {'Benign': 0, 'Cancer': 1, 'Control': 2}
Training set size: (1281, 1620)
Test set size: (427, 1620)
Number of classes: 3
Using device: cuda
Training base models on training dataset...
Training XGBoost...
XGBoost training completed.
Training LightGBM...
LightGBM training completed.
Training Enhanced Transformer...
Epoch 1/50 - Loss: 1.0632 - Train Acc: 0.5498 - Val Acc: 0.7082
Epoch 6/50 - Loss: 0.6300 - Train Acc: 0.8105 - Val Acc: 0.7821
Epoch 11/50 - Loss: 0.5451 - Train Acc: 0.8789 - Val Acc: 0.7510
Epoch 16/50 - Loss: 0.4071 - Train Acc: 0.9531 - Val Acc: 0.7549
Early stopping at epoch 18
Best transformer validation accuracy: 0.7938
Enhanced Transformer training completed.
Creating stacking ensemble...
Processing fold 1/5
Epoch 1/30 - Loss: 1.0865 - Train Acc: 0.5625 - Val Acc: 0.7315
Epoch 6/30 - Loss: 0.6199 - Train Acc: 0.8232 - Val Acc: 0.7432
Epoch 11/30 - Loss: 0.5525 - Train Acc: 0.8691 - Val Acc: 0.7588
E