pre-pro

In [None]:
!pip install wfdb
# Import necessary libraries
import os
import numpy as np
import pandas as pd
import wfdb

# Path to dataset (adjust as needed)
dataset_path = '/content/ECG_dataset/ECG_dataset/'

# Clean up record names (remove any suffixes like - or _)
records = [f.split('.')[0] for f in os.listdir(dataset_path) if f.endswith('.atr')]
records = list(set(records))
records.sort()

# Identify unique labels in the dataset
unique_labels = set()
for record_name in records:
    try:
        annotations = wfdb.rdann(os.path.join(dataset_path, record_name), 'atr')
        unique_labels.update(annotations.symbol)
    except Exception as e:
        print(f"Exception occurred while reading {record_name}: {e}")

print(f"Unique labels: {unique_labels}")

# Window size setup
window_size = 187
half_window = window_size // 2

# Mapping of heartbeat types to labels
mapping = {
    'N': 0, 'L': 0, 'R': 0, 'B': 0,   # Normal beats
    'A': 1, 'a': 1, 'J': 1, 'S': 1,   # Supraventricular beats
    'V': 2, 'E': 2,                   # Ventricular beats
    'F': 3,                           # Fusion beats
    '/': 4, 'f': 4, 'Q': 4, 'j': 4    # Unknown beats
}

# Initialize lists to store extracted data
heartbeats = []
labels = []
skip = 0
invalid = 0

# Read ECG signals and annotations
for record_num in records:
    set_path = os.path.join(dataset_path, record_num)

    try:
        # Load ECG record and annotations
        record = wfdb.rdrecord(set_path)
        ann = wfdb.rdann(set_path, "atr")

        ecg_signal = record.p_signal[:, 0]  # Take lead 1 (or modify for other leads)
        r_peaks = ann.sample
        label = ann.symbol

        for i, n in enumerate(r_peaks):
            start = n - half_window
            end = start + window_size

            # Skip if the signal is too close to the start or end
            if start < 10 or end > len(ecg_signal) - 10:
                skip += 1
                continue

            heart_beat = ecg_signal[start:end]
            l = mapping.get(label[i], -1)

            if l != -1:
                heartbeats.append(heart_beat)
                labels.append(l)
            else:
                invalid += 1

    except Exception as e:
        print(f"Exception occurred at {record_num}: {e}")

# Create a DataFrame from the extracted data
df = pd.DataFrame(heartbeats)
df['label'] = labels

# Save the DataFrame to a CSV file
csv_path = '/content/mitbih_ecg_processed.csv'
df.to_csv(csv_path, index=False)

print(f"The data has been stored successfully to {csv_path}")
print(f"Total heartbeats: {len(heartbeats)}, Skipped: {skip}, Invalid: {invalid}")

gan

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
torch.manual_seed(42)
np.random.seed(42)

# Load and shuffle the dataset
df = pd.read_csv("mitbih_ecg_processed.csv")
df = df.sample(frac=1, random_state=42).reset_index(drop=True)

# Drop rows with missing labels
df = df.dropna(subset=['label'])

# Split features and labels
X = df.iloc[:, :-1]
y = df.iloc[:, -1]

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Normalize data
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Convert training data to DataFrame for GAN input
train_df = pd.DataFrame(X_train)
train_df['label'] = y_train.reset_index(drop=True)

# Filter minority classes
minority_classes = [1, 2, 3, 4]
minority_df = train_df[train_df['label'].isin(minority_classes)]

# Prepare GAN training data
X_minority = minority_df.drop('label', axis=1).values
y_minority = minority_df['label'].values

# Torch Dataset and Dataloader
import torch
from torch.utils.data import Dataset, DataLoader

class ECGDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

data_loader = DataLoader(ECGDataset(X_minority, y_minority), batch_size=64, shuffle=True)

# GAN Model Definition
import torch.nn as nn

class Generator(nn.Module):
    def __init__(self, noise_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim),
            nn.Sigmoid()
        )

    def forward(self, z):
        return self.model(z)

class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

noise_dim = 100
input_dim = X_minority.shape[1]

gen = Generator(noise_dim, input_dim)
disc = Discriminator(input_dim)

criterion = nn.BCELoss()
g_optimizer = torch.optim.Adam(gen.parameters(), lr=0.0002)
d_optimizer = torch.optim.Adam(disc.parameters(), lr=0.0002)

gen.train()
disc.train()

# Train GAN
for epoch in range(1000):
    for real_data, _ in data_loader:
        batch_size = real_data.size(0)

        # Real and fake labels
        real_labels = torch.ones(batch_size, 1)
        fake_labels = torch.zeros(batch_size, 1)

        # Train discriminator
        d_optimizer.zero_grad()
        outputs = disc(real_data)
        d_loss_real = criterion(outputs, real_labels)

        z = torch.randn(batch_size, noise_dim)
        fake_data = gen(z)
        outputs = disc(fake_data.detach())
        d_loss_fake = criterion(outputs, fake_labels)

        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        d_optimizer.step()

        # Train generator
        g_optimizer.zero_grad()
        z = torch.randn(batch_size, noise_dim)
        fake_data = gen(z)
        outputs = disc(fake_data)
        g_loss = criterion(outputs, real_labels)

        g_loss.backward()
        g_optimizer.step()

    if (epoch+1) % 200 == 0:
        print(f"Epoch {epoch+1}/1000, D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}")

# Generate synthetic samples per class (e.g., class 1, 2, 3, 4)
synthetic_samples = []
labels_to_generate = [1, 2, 3, 4]

samples_per_class = {1:72286 , 2: 72286, 3: 72286, 4: 72286}

for label in labels_to_generate:
    n_samples = samples_per_class[label]
    z = torch.randn(n_samples, noise_dim)
    synth = gen(z).detach().numpy()
    synth_df = pd.DataFrame(synth)
    synth_df['label'] = label
    synthetic_samples.append(synth_df)

# Combine all synthetic samples
synthetic_df = pd.concat(synthetic_samples, axis=0).reset_index(drop=True)

# Combine with real majority class (class 0)
real_majority_df = train_df[train_df['label'] == 0]
balanced_df = pd.concat([real_majority_df, synthetic_df], axis=0).reset_index(drop=True)

# Final checks
print(f"Balanced training data shape: {balanced_df.shape}")
assert balanced_df.shape[1] == 188, "Expected 187 features and 1 label column."
assert balanced_df.isnull().sum().sum() == 0, "Missing values found in balanced data."

# Save balanced training data before PCA
balanced_df.to_csv("Balanced_Training_Data_Pre_PCA.csv", index=False)
print("Balanced training data saved as 'Balanced_Training_Data_Pre_PCA.csv'")

# PCA on training and test data
X_balanced = balanced_df.iloc[:, :-1]
y_balanced = balanced_df['label']

X_test_df = pd.DataFrame(X_test)
X_test_df['label'] = y_test.reset_index(drop=True)

pca = PCA(n_components=30)
X_bal_pca = pca.fit_transform(X_balanced)
X_test_pca = pca.transform(X_test)

# Save PCA-reduced training and test data
pd.DataFrame(X_bal_pca).assign(label=y_balanced.reset_index(drop=True)).to_csv("Training_PCA.csv", index=False)
pd.DataFrame(X_test_pca).assign(label=y_test.reset_index(drop=True)).to_csv("Testing_PCA.csv", index=False)

print("PCA reduced training and test data saved as 'Training_PCA.csv' and 'Testing_PCA.csv'")
# Plot label distribution
def plot_label_distribution(y, title):
    count = pd.Series(y).value_counts().reindex([0, 1, 2, 3, 4], fill_value=0)
    plt.bar(count.index, count.values, color='lightblue', edgecolor='black')
    for i, v in enumerate(count.values):
        plt.text(i, v + 5, str(v), ha='center')
    plt.xlabel("Class Labels")
    plt.ylabel("Samples")
    plt.title(title)

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plot_label_distribution(y_balanced, "Training Labels Distribution")
plt.subplot(1, 2, 2)
plot_label_distribution(y_test, "Test Labels Distribution")
plt.tight_layout()
plt.show()

smote

In [None]:
#SMOTE data preparation
import pandas as pd
train_data = pd.read_csv("Training_normalized.csv")
test_data = pd.read_csv("Testing_normalized.csv")

# Split features and labels
X_train = train_data.iloc[:, :-1]  # All columns except the last one
y_train = train_data.iloc[:, -1]   # Last column as target
X_test = test_data.iloc[:, :-1]
y_test = test_data.iloc[:, -1]
print(f"Train:{X_train.shape}")
print(f"Train:{X_test.shape}")
from imblearn.over_sampling import SMOTE
from sklearn.decomposition import PCA

# Step 1: Apply SMOTE to training data only (important!)
smote = SMOTE(random_state=42)
X_train_oversampled, y_train_oversampled = smote.fit_resample(X_train, y_train)

print("SMOTE oversampling done!")
print(f"Oversampled training data shape: {X_train_oversampled.shape}....{y_train_oversampled.shape}")

# Step 2: Now apply PCA to the oversampled data
pca = PCA(n_components=30)
train_pca = pca.fit_transform(X_train_oversampled)
test_pca = pca.transform(X_test)

print("Data dimensionality reduced with PCA!")
print(f"Training data: {train_pca.shape}....{y_train_oversampled.shape}")
print(f"Testing data: {test_pca.shape}....{y_test.shape}")

training=pd.DataFrame(train_pca)
training['label']=pd.Series(y_train_oversampled).reset_index(drop=True)
training.to_csv("Training_PCA_oversampled.csv",index=False)
print("Training data saved")
testing=pd.DataFrame(test_pca)
testing['label']=pd.Series(y_test).reset_index(drop=True)
testing.to_csv("Testing_PCA_oversampled.csv",index=False)
print("Testing data saved")
#histogram representation of SMOTE
import matplotlib.pyplot as plt

def plot_label_distribution(y, title):
    # Count label occurrences
    count = pd.Series(y).value_counts()

    # Ensure all classes (0 to 4) are present
    classes = [0, 1, 2, 3, 4]
    count = count.reindex(classes, fill_value=0)

    # Plot bar chart
    plt.bar(count.index, count.values, color='lightblue', edgecolor='black')

    # Add text labels on top of bars
    for i, v in enumerate(count.values):
        plt.text(i, v + 5, str(v), ha='center')

    # Formatting
    plt.xlabel("Class Labels")
    plt.ylabel("Samples")
    plt.xticks(rotation=0)
    plt.title(title)

# Create subplots for Training and Test labels
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plot_label_distribution(y_train_oversampled, "Training Labels Distribution")

plt.tight_layout()
plt.show()

CLASS WEIGHT

In [None]:
import numpy as np
import sklearn
import scipy
import pandas as pd
import pandas as pd
train_data = pd.read_csv("Training_PCA.csv")
test_data = pd.read_csv("Testing_PCA.csv")

# Split features and labels
X_train = train_data.iloc[:, :-1]  # All columns except the last one
y_train = train_data.iloc[:, -1]   # Last column as target
X_test = test_data.iloc[:, :-1]
y_test = test_data.iloc[:, -1]
print(f"Train:{X_train.shape}")
print(f"Train:{X_test.shape}")
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Compute class weights
classes = np.unique(y_train)
weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weights = dict(zip(classes, weights))

# Compute sample weights for each class
sample_weights = np.array([class_weights[class_] for class_ in y_train])
print(sample_weights)

XG+OPTUNA

In [None]:
import optuna
import xgboost as xgb
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score

def objective(trial):
    param = {
        "objective": "multi:softmax",
        "num_class": len(np.unique(y_train)),
        "eval_metric": "mlogloss",
        "n_estimators": trial.suggest_int("n_estimators", 50, 300),
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "gamma": trial.suggest_float("gamma", 0, 10),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-2, 10.0, log=True),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-2, 10.0, log=True),
    }

    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    macro_f1s = []
    weighted_f1s = []

    for train_idx, val_idx in skf.split(X_train, y_train):
        X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
        y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
        sw_tr = sample_weights[train_idx]

        model = xgb.XGBClassifier(**param)
        model.fit(X_tr, y_tr, sample_weight=sw_tr)

        preds = model.predict(X_val)
        macro_f1s.append(f1_score(y_val, preds, average="macro"))
        weighted_f1s.append(f1_score(y_val, preds, average="weighted"))

    avg_macro_f1 = np.mean(macro_f1s)
    avg_weighted_f1 = np.mean(weighted_f1s)

    trial.set_user_attr("macro_f1", avg_macro_f1)
    return avg_weighted_f1

# Run Optuna optimization
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50, timeout=600)

# Print best results
print("Best parameters:", study.best_params)
print("Best Weighted F1 Score:", study.best_value)
print("Corresponding Macro F1 Score:", study.best_trial.user_attrs["macro_f1"])
from sklearn.metrics import classification_report, f1_score
from xgboost import XGBClassifier
best_params = {'n_estimators': 300,
               'max_depth': 6,
               'learning_rate': 0.2567451389613149,
              'subsample': 0.8400092987532728,
              'colsample_bytree': 0.8464805154502861,
              'gamma': 0.046440916333720494,
              'reg_lambda': 1.6414868645406089,
              'reg_alpha': 0.49211449804115437}
model = XGBClassifier(**best_params)
model.fit(X_train, y_train, sample_weight=sample_weights)

y_train_pred = model.predict(X_train)
y_test_pred = model.predict(X_test)

print("📊 Classification Report - Train (SMOTE applied):")
print(classification_report(y_train, y_train_pred))
print("\n📊 Classification Report - Test:")
print(classification_report(y_test, y_test_pred))

train_macro_f1 = f1_score(y_train, y_train_pred, average='macro')
test_macro_f1 = f1_score(y_test, y_test_pred, average='macro')
print(f"\n Train Macro F1 Score: {train_macro_f1:.4f}")
print(f" Test Macro F1 Score: {test_macro_f1:.4f}")

RANDOM FOREST

In [None]:
!pip install optuna
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score,classification_report
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Define the objective function for Optuna
def objective(trial):
    rf_model = RandomForestClassifier(
        n_estimators=trial.suggest_int("n_estimators", 50, 150),
        max_depth=trial.suggest_int("max_depth", 10, 40),
        min_samples_split=trial.suggest_int("min_samples_split", 2, 20),
        min_samples_leaf=trial.suggest_int("min_samples_leaf", 1, 10),
        class_weight=class_weights,
        random_state=42,
        n_jobs=-1
    )

    rf_model.fit(X_train, y_train)
    y_pred = rf_model.predict(X_test)

    return f1_score(y_test, y_pred, average='weighted')


# Create and run the Optuna study
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, timeout=600)

# Train final model using best parameters
best_params = study.best_params
print(" Best Parameters:", best_params)

# Recompute sample weights for full train set (in case of retraining on full data later)
sample_weights = np.array([class_weights[class_] for class_ in y_train])

# Re-train with best params
best_rf = RandomForestClassifier(**best_params, random_state=42, n_jobs=-1)
best_rf.fit(X_train, y_train, sample_weight=sample_weights)
y_test_pred = best_rf.predict(X_test)

# Evaluate
print("Weighted F1-score:", f1_score(y_test, y_test_pred, average='weighted'))
print("Macro F1-score:", f1_score(y_test, y_test_pred, average='macro'))
print("Micro F1-score:", f1_score(y_test, y_test_pred, average='micro'))

print("\n Classification Report - Test:")
print(classification_report(y_test, y_test_pred))

LinearSVC

In [None]:
!pip install optuna
import optuna
from sklearn.svm import LinearSVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

def objective(trial):
    # Suggest the regularization parameter C and class weights
    C = trial.suggest_float("linearsvc__C", 0.01, 10.0, log=True)

    # Build the pipeline with class weights
    pipeline = make_pipeline(
        StandardScaler(),
        LinearSVC(C=C, max_iter=10000, dual=False, class_weight=class_weights)
    )

    # Evaluate using cross-validation (macro F1)
    macro_f1 = cross_val_score(pipeline, X_train, y_train, cv=5, scoring="f1_macro", n_jobs=-1).mean()
    return macro_f1

# Run the optimization
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=30, timeout=300)

# Train with best parameters
best_C = study.best_params["linearsvc__C"]

best_pipeline = make_pipeline(
    StandardScaler(),
    LinearSVC(C=best_C, max_iter=10000, dual=False, class_weight=class_weights)
)

best_pipeline.fit(X_train, y_train)

# Predictions & Evaluation
linear_preds = best_pipeline.predict(X_test)

# Print classification report
print("Best Params for LinearSVC:", study.best_params)
print("Classification Report:\n", classification_report(y_test, linear_preds, target_names=np.unique(y_test).astype(str)))

svc

In [None]:
from sklearn.svm import SVC
from sklearn.metrics import f1_score, classification_report

# Manually set the best parameters
best_params = {'C': 0.6811111135767808, 'kernel': 'rbf', 'gamma': 'scale'}
print("Best Params for SVC:", best_params)

# Train final SVC with best parameters
best_svc = SVC(**best_params, random_state=42)
best_svc.fit(X_train, y_train)

# Predictions
train_preds = best_svc.predict(X_train)
test_preds = best_svc.predict(X_test)

# Macro F1 scores
train_f1 = f1_score(y_train, train_preds, average='macro')
test_f1 = f1_score(y_test, test_preds, average='macro')

print(f"\n Macro F1-score on Training Data: {train_f1:.4f}")
print(f" Macro F1-score on Test Data: {test_f1:.4f}")

# Classification report
print("\n📋 Classification Report on Test Data:\n")
print(classification_report(y_test, test_preds))

levyja

In [None]:
import pandas as pd
import numpy as npa
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, f1_score
from xgboost import XGBClassifier
import random
import scipy.stats as stats

train_data = pd.read_csv("Training_PCA.csv")
test_data = pd.read_csv("Testing_PCA.csv")

X_train = train_data.iloc[:, :-1]
y_train = train_data.iloc[:, -1]
X_test = test_data.iloc[:, :-1]
y_test = test_data.iloc[:, -1]

# Compute sample weights
classes = np.unique(y_train)
weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weights = dict(zip(classes, weights))
sample_weights = np.array([class_weights[label] for label in y_train])

# ----------------------- Optuna Best Params (Initial Point) -----------------------
best_params = {
    'n_estimators': 300,
    'max_depth': 6,
    'learning_rate': 0.2567451389613149,
    'subsample': 0.8400092987532728,
    'colsample_bytree': 0.8464805154502861,
    'gamma': 0.046440916333720494,
    'reg_lambda': 1.6414868645406089,
    'reg_alpha': 0.49211449804115437
}

# ----------------------- Define Bounds Based on Best Params -----------------------
param_bounds = {
    'learning_rate': (best_params['learning_rate'] * 0.5, best_params['learning_rate'] * 1.5),
    'max_depth': (max(3, best_params['max_depth'] - 2), min(15, best_params['max_depth'] + 2)),
    'subsample': (best_params['subsample'] * 0.8, 1.0),
    'colsample_bytree': (best_params['colsample_bytree'] * 0.8, 1.0),
    'gamma': (0.0, best_params['gamma'] * 1.5),
    'reg_lambda': (max(1e-3, best_params['reg_lambda'] * 0.5), best_params['reg_lambda'] * 1.5),
    'reg_alpha': (max(1e-3, best_params['reg_alpha'] * 0.5), best_params['reg_alpha'] * 1.5),
    'n_estimators': (int(best_params['n_estimators'] * 0.8), best_params['n_estimators'])
}

# ----------------------- Levy Flight Generator -----------------------
def levy_flight(beta=1.5, size=1):
    sigma_u = (stats.gamma(1 + beta).pdf(1) * np.sin(np.pi * beta / 2) /
               (stats.gamma((1 + beta) / 2).pdf(1) * beta * 2 ** ((beta - 1) / 2))) ** (1 / beta)
    u = np.random.normal(0, sigma_u, size=size)
    v = np.random.normal(0, 1, size=size)
    step = u / (np.abs(v) ** (1 / beta))
    return step

# ----------------------- Fitness Function (macro F1) -----------------------
def fitness_function(params):
    model = XGBClassifier(**params,
                          objective='multi:softmax',
                          num_class=len(np.unique(y_train)),
                          eval_metric='mlogloss',
                          use_label_encoder=False,
                          verbosity=0)
    model.fit(X_train, y_train, sample_weight=sample_weights)
    y_pred = model.predict(X_test)
    macro_f1 = f1_score(y_test, y_pred, average='macro')
    return macro_f1

# ----------------------- Lévy JA Optimizer -----------------------
def levy_ja(fitness_func, param_bounds, num_agents=10, max_iter=20):
    agents = []
    for _ in range(num_agents):
        agent = {k: np.random.uniform(low, high) for k, (low, high) in param_bounds.items()}
        agent['max_depth'] = int(agent['max_depth'])
        agent['n_estimators'] = int(agent['n_estimators'])
        agents.append(agent)

    best_agent = max(agents, key=fitness_func)
    best_score = fitness_func(best_agent)

    for iteration in range(1, max_iter + 1):
        for i in range(num_agents):
            new_agent = {}
            for key in param_bounds.keys():
                step = levy_flight(beta=1.5, size=1)[0]
                val = agents[i][key] + step * (agents[i][key] - best_agent[key])
                low, high = param_bounds[key]
                if isinstance(low, int) or 'int' in key or key in ['max_depth', 'n_estimators']:
                    val = int(np.clip(val, low, high))
                else:
                    val = float(np.clip(val, low, high))
                new_agent[key] = val

            new_agent['max_depth'] = int(new_agent['max_depth'])
            new_agent['n_estimators'] = int(new_agent['n_estimators'])

            new_score = fitness_func(new_agent)
            if new_score > fitness_func(agents[i]):
                agents[i] = new_agent
                if new_score > best_score:
                    best_score = new_score
                    best_agent = new_agent

        print(f"Iteration {iteration} | Best Macro F1 Score: {best_score:.4f}")

    return best_agent, best_score

# ----------------------- Run Lévy JA Optimization -----------------------
print("Running Lévy JA optimization (objective: macro F1)...")
best_levy_params, best_macro_f1 = levy_ja(fitness_function, param_bounds, num_agents=10, max_iter=20)

# ----------------------- Train Final Model -----------------------
print("\nTraining final model with optimized parameters...")
model = XGBClassifier(**best_levy_params,
                      objective='multi:softmax',
                      num_class=len(np.unique(y_train)),
                      eval_metric='mlogloss',
                      use_label_encoder=False,
                      verbosity=0)
model.fit(X_train, y_train, sample_weight=sample_weights)

# ----------------------- Predictions -----------------------
y_train_pred = model.predict(X_train)
y_test_pred = model.predict(X_test)

# ----------------------- Evaluation -----------------------
print("\n Classification Report - Train:")
print(classification_report(y_train, y_train_pred))

print("\nClassification Report - Test:")
print(classification_report(y_test, y_test_pred))

train_macro_f1 = f1_score(y_train, y_train_pred, average='macro')
test_macro_f1 = f1_score(y_test, y_test_pred, average='macro')

print(f"\n Train Macro F1 Score: {train_macro_f1:.4f}")
print(f" Test Macro F1 Score: {test_macro_f1:.4f}")

print("\n Best Parameters Found by Lévy JA:")
print(best_levy_params)
print(f"Best Macro F1 Score: {best_macro_f1:.4f}")

enhanced AEO

In [None]:
pip install mealpy==3.0.1
import pandas as pd
train_data = pd.read_csv("Training_PCA.csv")
test_data = pd.read_csv("Testing_PCA.csv")

# Split features and labels
X_train = train_data.iloc[:, :-1]  # All columns except the last one
y_train = train_data.iloc[:, -1]   # Last column as target
X_test = test_data.iloc[:, :-1]
y_test = test_data.iloc[:, -1]
print(f"Train:{X_train.shape}")
print(f"Train:{X_test.shape}")
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Compute class weights
classes = np.unique(y_train)
weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weights = dict(zip(classes, weights))

# Compute sample weights for each class
sample_weights = np.array([class_weights[class_] for class_ in y_train])
print(sample_weights)


#enhanced AEO
import numpy as np
from sklearn.metrics import f1_score, make_scorer
from xgboost import XGBClassifier
from mealpy import FloatVar, IntegerVar
from sklearn.model_selection import cross_val_score
from mealpy.system_based.AEO import EnhancedAEO
def objective_func(solution):
    n_estimators = int(solution[0])
    max_depth = int(solution[1])
    learning_rate = solution[2]

    model = XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
    )

    # Fit the model directly on the training data with sample weights
    model.fit(X_train, y_train, sample_weight=sample_weights)

    # Predict on the validation set
    y_pred = model.predict(X_test)

    # Calculate the F1 score on the validation set (macro average)
    score = f1_score(y_test, y_pred, average='macro')

    return -score
problem_dict = {
    "obj_func": objective_func,
    "bounds": [
        IntegerVar(lb=250, ub=350),    # n_estimators
        IntegerVar(lb=4, ub=8),      # max_depth
        FloatVar(lb=0.2067, ub=0.3067),    # learning_rate
    ],
    "minmax": "min",
}
model = EnhancedAEO(epoch=20, pop_size=10)
model.solve(problem_dict)

best_params = model.g_best.solution
best_macro_f1 = -model.g_best.target.fitness  # flip sign

print("Best Parameters:", best_params)
print("Best Macro F1 Score:", best_macro_f1)

import matplotlib.pyplot as plt

# F1 scores from your logs (converted from negative to positive)
f1_scores = [
    0.9512974517681467,
    0.9512974517681467,
    0.9514630839626074,
    0.9514912950542224,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
    0.9517214667287547,
]

epochs = list(range(1, len(f1_scores) + 1))

# Plot
plt.figure(figsize=(10, 6))
plt.plot(epochs, f1_scores, marker='o', color='green')
plt.title('Macro F1 Score over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Macro F1 Score')
plt.ylim(0.951, 0.9520)
plt.grid(True)
plt.xticks(epochs)
plt.tight_layout()
plt.savefig("EnhancedAEO.png")
plt.show()



jade

In [None]:


#XGboost+JADE
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, make_scorer
from xgboost import XGBClassifier
from mealpy import FloatVar, IntegerVar
from sklearn.model_selection import cross_val_score
from mealpy.evolutionary_based.DE import JADE


def objective_func(solution):
    n_estimators = int(solution[0])
    max_depth = int(solution[1])
    learning_rate = solution[2]

    model = XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
    )

    # Fit the model directly on the training data with sample weights
    model.fit(X_train, y_train, sample_weight=sample_weights)

    # Predict on the validation set
    y_pred = model.predict(X_test)

    # Calculate the F1 score on the validation set (macro average)
    score = f1_score(y_test, y_pred, average='macro')

    return score

problem_dict = {
    "obj_func": objective_func,
    "bounds": [
        IntegerVar(lb=250, ub=350),    # n_estimators
        IntegerVar(lb=4, ub=8),      # max_depth
        FloatVar(lb=0.2067, ub=0.3067),    # learning_rate
    ],
    "minmax": "max",
}
model = JADE(epoch=20, pop_size=10)
model.solve(problem_dict)

best_params = model.g_best.solution
best_macro_f1 = model.g_best.target.fitness  # flip sign

print("Best Parameters:", best_params)
print("Best Macro F1 Score:", best_macro_f1)
import matplotlib.pyplot as plt

# Extracted F1 scores from the JADE log
f1_scores = [
    0.9493475848498999,
    0.9493475848498999,
    0.9516667449690784,
    0.9516667449690784,
    0.9516667449690784,
    0.9516667449690784,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.9516811576171575,
    0.952600968671377,
]

epochs = list(range(1, len(f1_scores) + 1))

# Plot
plt.figure(figsize=(10, 6))
plt.plot(epochs, f1_scores, marker='o', color='blue')
plt.title('JADE: Macro F1 Score over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Macro F1 Score')
plt.ylim(min(f1_scores) - 0.0005, max(f1_scores) + 0.0005)
plt.grid(True)
plt.xticks(epochs)
plt.tight_layout()
plt.savefig("JADE.png")
plt.show()

OrgJA

In [None]:
#orgJA
import optuna
import xgboost as xgb
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score

def objective(trial):
    param = {
        "objective": "multi:softmax",
        "num_class": len(np.unique(y_train)),
        "eval_metric": "mlogloss",
        "n_estimators": trial.suggest_int("n_estimators", 50, 300),
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "gamma": trial.suggest_float("gamma", 0, 10),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-2, 10.0, log=True),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-2, 10.0, log=True),
    }

    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    macro_f1s = []
    weighted_f1s = []

    for train_idx, val_idx in skf.split(X_train, y_train):
        X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
        y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
        sw_tr = sample_weights[train_idx]

        model = xgb.XGBClassifier(**param)
        model.fit(X_tr, y_tr, sample_weight=sw_tr)

        preds = model.predict(X_val)
        macro_f1s.append(f1_score(y_val, preds, average="macro"))
        weighted_f1s.append(f1_score(y_val, preds, average="weighted"))

    avg_macro_f1 = np.mean(macro_f1s)
    avg_weighted_f1 = np.mean(weighted_f1s)

    trial.set_user_attr("macro_f1", avg_macro_f1)
    return avg_weighted_f1

# Run Optuna optimization
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50, timeout=600)

# Print best results
print("Best parameters:", study.best_params)
print("Best Weighted F1 Score:", study.best_value)
print("Corresponding Macro F1 Score:", study.best_trial.user_attrs["macro_f1"])

import numpy as np
from xgboost import XGBClassifier
from sklearn.metrics import f1_score

# Define param_bounds (param_ranges)
param_bounds = {
    'max_depth': (4, 8),          # ±2 around 6
    'learning_rate': (0.2067, 0.3067),  # ±0.05 around 0.2567
    'n_estimators': (250, 350),  # ±50 around 300
    'gamma': (0.0, 0.096),       # ±0.05 around 0.046
    'subsample': (0.74, 0.94),    # ±0.1 around 0.84
    'colsample_bytree': (0.746, 0.946),  # ±0.1 around 0.846
}

# Fitness function for XGBoost model
def fitness_func(params):
    max_depth, learning_rate, n_estimators, gamma, subsample, colsample_bytree = params
    model = XGBClassifier(
        max_depth=int(max_depth),
        learning_rate=learning_rate,
        n_estimators=int(n_estimators),
        gamma=gamma,
        subsample=subsample,
        colsample_bytree=colsample_bytree,
        objective='multi:softmax',
        num_class=len(np.unique(y_train)),
        eval_metric='mlogloss'
    )

    model.fit(X_train, y_train, sample_weight=sample_weights)
    preds = model.predict(X_test)
    macro_f1 = f1_score(y_test, preds, average='macro')
    return 1 - macro_f1  # Minimize 1 - macro F1 score (maximize F1)

# Initialize population randomly within bounds
def initialize_population():
    population = []
    for _ in range(population_size):
        individual = []
        for key in param_bounds:
            low, high = param_bounds[key]
            value = np.random.uniform(low, high)
            individual.append(value)
        population.append(individual)
    return np.array(population)

# Jaya update function for OriginalJA
def jaya_update(population, best_solution, worst_solution):
    new_population = []
    for x in population:
        r1, r2 = np.random.rand(), np.random.rand()
        new_x = x + r1 * (best_solution - np.abs(x)) - r2 * (worst_solution - np.abs(x))

        # Clip to bounds
        for i, key in enumerate(param_bounds):
            low, high = param_bounds[key]
            new_x[i] = np.clip(new_x[i], low, high)
        new_population.append(new_x)
    return np.array(new_population)

# Original Jaya Algorithm loop
def original_jaya_algorithm():
    population = initialize_population()
    fitness = np.array([fitness_func(ind) for ind in population])

    best_idx = np.argmin(fitness)
    best_solution = population[best_idx]
    best_score = fitness[best_idx]

    worst_idx = np.argmax(fitness)
    worst_solution = population[worst_idx]

    for t in range(max_iter):
        population = jaya_update(population, best_solution, worst_solution)
        fitness = np.array([fitness_func(ind) for ind in population])

        current_best_idx = np.argmin(fitness)
        current_worst_idx = np.argmax(fitness)

        if fitness[current_best_idx] < best_score:
            best_solution = population[current_best_idx]
            best_score = fitness[current_best_idx]
        if fitness[current_worst_idx] > best_score:
            worst_solution = population[current_worst_idx]

        # Print the progress for each iteration
        print(f"Iteration {t+1} | Best Macro F1 Score: {1 - best_score:.4f}")

    return best_solution, 1 - best_score

# Set the population size and maximum iterations for OriginalJA
population_size = 10
max_iter = 20

# Run OriginalJA optimization
best_params_ja, best_f1_ja = original_jaya_algorithm()

# Show Final Best Parameters
param_names = list(param_bounds.keys())
final_params = {k: (int(v) if 'int' in str(type(param_bounds[k][0])) else round(v, 4))
                for k, v in zip(param_names, best_params_ja)}

print("\n✅ Best Parameters Found by OriginalJA:")
print(final_params)
print(f"Best Macro F1 Score: {best_f1_ja:.4f}")
import matplotlib.pyplot as plt

# Sample data - replace with your actual F1 scores from Original JA
iterations = list(range(1, 21))  # 20 iterations
macro_f1_scores = [0.9498,
0.9498,0.9516,0.9516,0.9524,0.9524,0.9524,0.9524,0.9524,
0.9524,0.9524,0.9524,0.9524,0.9524,0.9524,0.9524,0.9524,0.9524,0.9524,0.9524]

# Plotting
plt.figure(figsize=(10, 6))
plt.plot(iterations, macro_f1_scores, marker='o', color='blue', label='Original JA')

# Styling
plt.title('Original JA Optimization - Macro F1 Score per Iteration')
plt.xlabel('Iteration Number')
plt.ylabel('Macro F1 Score')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig("OrginalJA.png",dpi=300)
# Show the plot
plt.show()