In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input/kaggle/input/collection-of-textures-in-colorectal-cancer/'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# Importing required libraries

import os
import random
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
from torchvision import datasets
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
from transformers import BeitModel
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, recall_score, precision_score, f1_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import cycle
from pathlib import Path
from typing import Tuple, Dict, List
from pathlib import Path
from typing import Tuple, Dict, List
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import cv2
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

ORIGINAL_BASE = Path("/kaggle/input/collection-of-textures-in-colorectal-cancer/Kather_texture_2016_image_tiles_5000")
WORKING_BASE = Path("/kaggle/working/Kather_texture_working")       
PROCESSED_BASE = Path("/kaggle/working/Kather_texture_processed")   
SPLIT_BASE     = Path("/kaggle/working/Kather_texture_split")     

LABEL_MAP = {
    "01_TUMOR": "Tumor",
    "02_STROMA": "Stroma",
    "03_COMPLEX": "Complex",
    "04_LYMPHO": "Lympho",
    "05_DEBRIS": "Debris",
    "06_MUCOSA": "Mucosa",
    "07_ADIPOSE": "Adipose",
    "08_EMPTY": "Empty"
}

BEST_DROPOUT = 0.3           
BEST_LR = 1e-4              
LABEL_SMOOTHING = 0.1        
WEIGHT_DECAY = 1e-5          
EPOCHS = 20                  
PATIENCE = 5                
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
IMG_SIZE = 224
BLUR_KERNEL = (5, 5)
PREPROCESS_WORKERS = min(8, os.cpu_count() or 4)
BATCH_SIZE = 64
NUM_WORKERS = 4

In [None]:
# training hparams
EPOCHS = 100
PATIENCE = 4

# logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
log = logging.getLogger("kather_pipeline")

def preprocess_image(in_path: Path, out_path: Path, img_size: int = IMG_SIZE) -> Tuple[Path, bool, str]:
    try:
        img = cv2.imread(str(in_path))  # BGR
        if img is None:
            return in_path, False, "unreadable"

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Per-channel histogram equalization
        r, g, b = cv2.split(img)
        r = cv2.equalizeHist(r)
        g = cv2.equalizeHist(g)
        b = cv2.equalizeHist(b)
        img = cv2.merge([r, g, b])

        # Blur & resize
        img = cv2.GaussianBlur(img, BLUR_KERNEL, 0)
        img = cv2.resize(img, (img_size, img_size), interpolation=cv2.INTER_AREA)

        out_path.parent.mkdir(parents=True, exist_ok=True)
        saved = cv2.imwrite(str(out_path), cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
        return out_path, bool(saved), "ok" if saved else "save_failed"
    except Exception as e:
        return in_path, False, f"error:{e}"

In [None]:
def preprocess_folder(src_folder: Path, dst_folder: Path, label: str, limit_per_class: int | None = None) -> int:
    files = sorted(src_folder.glob("*.tif"))
    if not files:
        log.warning("No .tif images found in %s", src_folder)
        return 0
    if limit_per_class:
        files = files[:limit_per_class]

    dst_folder.mkdir(parents=True, exist_ok=True)
    futures, ok_count = [], 0
    with ThreadPoolExecutor(max_workers=PREPROCESS_WORKERS) as ex:
        for idx, img_path in enumerate(files):
            new_name = f"{label}_{idx:05d}.tif"
            out_path = dst_folder / new_name
            futures.append(ex.submit(preprocess_image, img_path, out_path, IMG_SIZE))
        for fut in tqdm(as_completed(futures), total=len(futures), desc=f"Preprocessing {label}"):
            _, ok, _ = fut.result()
            ok_count += int(ok)

    log.info("Processed %d/%d for %s -> %s", ok_count, len(files), label, dst_folder)
    return ok_count

In [None]:
def preprocess_all(src_base: Path, dst_base: Path, labels: Dict[str, str], limit_per_class: int | None = None):
    total = 0
    for new_label in labels.values():
        src_folder = src_base / new_label
        dst_folder = dst_base / new_label
        if src_folder.exists():
            total += preprocess_folder(src_folder, dst_folder, new_label, limit_per_class=limit_per_class)
        else:
            log.warning("Missing folder: %s", src_folder)
    log.info("Total processed images: %d", total)
    show_tree(dst_base, depth=2)

In [None]:
def display_samples(base: Path, labels: List[str], samples_per_class: int = 3):
    log.info("Displaying sample images...")
    n_rows = len(labels)
    n_cols = samples_per_class
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(4 * n_cols, 3 * n_rows))
    if n_rows == 1:
        axs = np.array([axs])
    for i, label in enumerate(sorted(labels)):
        folder = base / label
        imgs = list(folder.glob("*.tif"))
        if not imgs:
            continue
        sample = random.sample(imgs, min(samples_per_class, len(imgs)))
        for j in range(n_cols):
            ax = axs[i, j]
            if j < len(sample):
                img = cv2.imread(str(sample[j]))
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                ax.imshow(img)
                ax.set_title(f"{label}\n{sample[j].name}", fontsize=9)
            ax.axis("off")
    plt.tight_layout()
    plt.show()

In [None]:
# Data Loading

def ensure_rgb(img):
    if img.mode != 'RGB':
        img = img.convert('RGB')
    return img

def load_data(data_path, batch_size=32):
    dataset = datasets.ImageFolder(root=data_path, transform=None)
    labels = np.array([label for _, label in dataset.imgs])

    # Split train/val/test
    sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=42)
    train_val_idx, test_idx = next(sss1.split(np.zeros(len(labels)), labels))

    train_val_labels = labels[train_val_idx]
    sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.1765, random_state=42)
    train_idx, val_idx = next(sss2.split(np.zeros(len(train_val_labels)), train_val_labels))

    train_idx = train_val_idx[train_idx]
    val_idx = train_val_idx[val_idx]

    train_data = Subset(dataset, train_idx)
    val_data = Subset(dataset, val_idx)
    test_data = Subset(dataset, test_idx)

    # Transforms
    train_transform = transforms.Compose([
        transforms.Lambda(ensure_rgb),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
    val_transform = train_transform

    train_data.dataset.transform = train_transform
    val_data.dataset.transform = val_transform
    test_data.dataset.transform = val_transform

    # Loaders
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)

    return dataset.classes, train_loader, val_loader, test_loader

In [None]:
# Model Definition

class HybridBeitEfficientNet(nn.Module):
    def __init__(self, num_classes, dropout=0.3, fc_units=512):
        super(HybridBeitEfficientNet, self).__init__()

        self.beit_backbone = BeitModel.from_pretrained('microsoft/beit-base-patch16-224-pt22k')
        for param in self.beit_backbone.parameters():
            param.requires_grad = False

        weights = EfficientNet_B0_Weights.IMAGENET1K_V1
        self.efficientnet_backbone = efficientnet_b0(weights=weights)
        self.efficientnet_backbone.classifier = nn.Identity()
        for param in self.efficientnet_backbone.parameters():
            param.requires_grad = False

        beit_feature_dim = self.beit_backbone.config.hidden_size
        efficientnet_feature_dim = 1280
        combined_feature_dim = beit_feature_dim + efficientnet_feature_dim

        self.fc = nn.Sequential(
            nn.Linear(combined_feature_dim, fc_units),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(fc_units, num_classes)
        )

    def forward(self, x):
        beit_outputs = self.beit_backbone(pixel_values=x).last_hidden_state
        beit_cls = beit_outputs[:, 0, :]
        efficientnet_out = self.efficientnet_backbone(x)
        combined = torch.cat((beit_cls, efficientnet_out), dim=1)
        return self.fc(combined)

In [None]:
# Training

def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=10, patience=4, model_name="model"):
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': []}
    best_val_acc = -1.0
    epochs_no_improve = 0
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5,
                                                           patience=int(patience/2), verbose=True)
    best_model_path = f'{model_name}_best_model.pth'
    best_model_wts = model.state_dict()

    for epoch in range(num_epochs):
        # Train
        model.train()
        running_loss, correct_train, total_train = 0.0, 0, 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = correct_train / total_train
        avg_train_loss = running_loss / len(train_loader)

        # Validation
        model.eval()
        val_loss, correct_val, total_val = 0.0, 0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        val_accuracy = correct_val / total_val
        avg_val_loss = val_loss / len(val_loader)

        history['train_loss'].append(avg_train_loss)
        history['train_acc'].append(train_accuracy)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_accuracy)
        history['lr'].append(optimizer.param_groups[0]['lr'])

        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | "
              f"Train Acc: {train_accuracy*100:.2f}% | Val Loss: {avg_val_loss:.4f} | "
              f"Val Acc: {val_accuracy*100:.2f}% | LR: {optimizer.param_groups[0]['lr']:.6f}")

        scheduler.step(avg_val_loss)

        if val_accuracy > best_val_acc:
            best_val_acc = val_accuracy
            best_model_wts = model.state_dict()
            torch.save(best_model_wts, best_model_path)
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

    model.load_state_dict(best_model_wts)
    return model, history

In [None]:
# Evaluation

def evaluate_model(model, test_loader, criterion, device, class_names, model_name="Model"):
    model.eval()
    all_labels, all_predictions = [], []
    total_loss = 0.0
    start_time = time.time()

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    end_time = time.time()
    accuracy = accuracy_score(all_labels, all_predictions)
    avg_loss = total_loss / len(test_loader.dataset)
    sensitivity = recall_score(all_labels, all_predictions, average='weighted')
    precision = precision_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')
    report_dict = classification_report(all_labels, all_predictions, target_names=class_names, output_dict=True)
    per_class_accuracy = [report_dict[name]['recall'] for name in class_names]
    std_dev = np.std(per_class_accuracy)

    print(f"Accuracy: {accuracy:.4f} | Loss: {avg_loss:.4f} | Sensitivity: {sensitivity:.4f} | "
          f"Precision: {precision:.4f} | F1: {f1:.4f} | Time: {end_time-start_time:.2f}s")

    cm = confusion_matrix(all_labels, all_predictions)
    plt.figure(figsize=(len(class_names)+2, len(class_names)+2))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Confusion Matrix - {model_name}')
    plt.show()

    return {
        'accuracy': accuracy, 'loss': avg_loss, 'sensitivity': sensitivity,
        'precision': precision, 'f1_score': f1, 'std_dev_accuracy_per_class': std_dev
    }

In [None]:
# Plotting

def plot_training_curves(history, model_name="Model"):
    epochs = range(1, len(history['train_loss']) + 1)
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['train_loss'], label='Training Loss', marker='o')
    plt.plot(epochs, history['val_loss'], label='Validation Loss', marker='o')
    plt.legend(), plt.grid(True)
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['train_acc'], label='Training Accuracy', marker='o')
    plt.plot(epochs, history['val_acc'], label='Validation Accuracy', marker='o')
    plt.legend(), plt.grid(True)
    plt.show()

def plot_multiclass_roc_auc(model, test_loader, class_names, device, model_name="Model"):
    model.eval()
    all_labels, all_probs = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            probs = F.softmax(outputs, dim=1).cpu().numpy()
            all_probs.extend(probs)
            all_labels.extend(labels.cpu().numpy())
    all_labels = np.array(all_labels)
    y_true_bin = label_binarize(all_labels, classes=list(range(len(class_names))))
    fpr, tpr, roc_auc = {}, {}, {}
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], np.array(all_probs)[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    plt.figure(figsize=(8, 6))
    colors = cycle(plt.cm.tab10.colors)
    for i, color in zip(range(len(class_names)), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2, label=f'{class_names[i]} (AUC = {roc_auc[i]:.2f})')
    plt.plot([0, 1], [0, 1], 'k--', lw=1)
    plt.legend(), plt.grid(True)
    plt.show()

In [None]:
from torch.utils.data import Subset
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
import random, time, copy


def run_pso_optimization(train_loader, val_loader, test_loader, class_names, device):
    import copy, time, random
    # ---- PSO (your original structure) ----
    class PSO:
        def __init__(self, fitness_func, bounds, num_particles, max_iter, c1=2.0, c2=2.0, w=0.7):
            self.fitness_func = fitness_func
            if isinstance(bounds, dict):
                self.hp_keys = list(bounds.keys())
                self.bounds = [bounds[k] for k in self.hp_keys]
            else:
                self.bounds = bounds
                self.hp_keys = [f'HP_{i}' for i in range(len(bounds))]
            self.num_particles = num_particles
            self.max_iter = max_iter
            self.c1, self.c2, self.w = c1, c2, w
            self.dim = len(self.bounds)
            self.minb = np.array([b[0] for b in self.bounds])
            self.maxb = np.array([b[1] for b in self.bounds])
            self.pos = np.random.uniform(self.minb, self.maxb, (self.num_particles, self.dim))
            self.vel = np.random.uniform(-1, 1, (self.num_particles, self.dim))
            self.pbest_pos = self.pos.copy()
            self.pbest_score = np.full(self.num_particles, -np.inf)
            self.gbest_pos = None
            self.gbest_score = -np.inf
            self.best_hps = None

        def _eval(self, hp_dict, tr_loader, va_loader, num_classes, device):
            return self.fitness_func(hp_dict, tr_loader, va_loader, num_classes, device)

        def optimize(self, tr_loader, va_loader, num_classes, device, patience=7, min_delta=0.005):
            best_hist, epochs_no_improve = [], 0
            self.gbest_score, self.gbest_pos, self.best_hps = -np.inf, None, None
            print(f"\n--- Starting PSO: particles={self.num_particles}, iters={self.max_iter} ---")
            for it in range(self.max_iter):
                t0 = time.time()
                for i in range(self.num_particles):
                    hp = {
                        self.hp_keys[0]: self.pos[i, 0],
                        self.hp_keys[1]: self.pos[i, 1],
                        self.hp_keys[2]: self.pos[i, 2],
                        self.hp_keys[3]: int(self.pos[i, 3])
                    }
                    s = self._eval(hp, tr_loader, va_loader, num_classes, device)
                    if s > self.pbest_score[i]:
                        self.pbest_score[i] = s
                        self.pbest_pos[i] = self.pos[i].copy()
                    if s > self.gbest_score:
                        self.gbest_score = s
                        self.gbest_pos = self.pos[i].copy()
                        self.best_hps = copy.deepcopy(hp)

                for i in range(self.num_particles):
                    r1 = np.random.rand(self.dim); r2 = np.random.rand(self.dim)
                    cog = self.c1 * r1 * (self.pbest_pos[i] - self.pos[i])
                    soc = self.c2 * r2 * (self.gbest_pos    - self.pos[i])
                    self.vel[i] = self.w * self.vel[i] + cog + soc
                    max_step = (self.maxb - self.minb) * 0.1
                    self.vel[i] = np.clip(self.vel[i], -max_step, max_step)
                    self.pos[i] = np.clip(self.pos[i] + self.vel[i], self.minb, self.maxb)

                dt = time.time() - t0
                print(f"PSO Iter {it+1}/{self.max_iter} | {dt:.2f}s | gbest={self.gbest_score:.4f} | hps={self.best_hps}")
                if not best_hist or (self.gbest_score - max(best_hist) > min_delta):
                    best_hist.append(self.gbest_score); epochs_no_improve = 0
                else:
                    epochs_no_improve += 1
                if epochs_no_improve >= patience:
                    print("Early stopping PSO."); break

            if self.best_hps is None:
                # fallback
                return {
                    self.hp_keys[0]: self.minb[0],
                    self.hp_keys[1]: self.minb[1],
                    self.hp_keys[2]: self.minb[2],
                    self.hp_keys[3]: int(self.minb[3]),
                }, 0.0
            return self.best_hps, self.gbest_score

    # fitness uses your HybridBeitEffNet; FC_Units is ignored (head is fixed at 512)
    def fitness_function_for_pso(hps, tr_loader, va_loader, num_classes, device):
        lr, wd = hps['LR'], hps['WD']
        dropout = hps['Dropout']
        model = HybridBeitEffNet(dropout=dropout, num_classes=num_classes).to(device)
        for p in model.beit_backbone.parameters(): p.requires_grad = False
        for p in model.efficientnet_backbone.parameters(): p.requires_grad = False
        crit = nn.CrossEntropyLoss(label_smoothing=0.1)
        opt  = optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
        model.train()
        for _ in range(7):
            for x, y in tr_loader:
                x, y = x.to(device), y.to(device)
                opt.zero_grad()
                loss = crit(model(x), y)
                loss.backward(); opt.step()
        # val accuracy
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for x, y in va_loader:
                x, y = x.to(device), y.to(device)
                pred = model(x).argmax(1)
                total += y.size(0)
                correct += (pred == y).sum().item()
        return correct / total

    # bounds (FC_Units present but unused by model; OK to keep)
    hp_bounds = {'LR': (1e-5, 1e-3), 'WD': (1e-5, 1e-3), 'Dropout': (0.1, 0.5), 'FC_Units': (256, 1024)}

    # Create subset loaders from the original loaders' datasets
    base_train_ds = train_loader.dataset
    base_val_ds   = val_loader.dataset
    frac = 0.5
    pso_train_idx = random.sample(range(len(base_train_ds)), int(len(base_train_ds)*frac))
    pso_val_idx   = random.sample(range(len(base_val_ds)),   int(len(base_val_ds)*frac))
    pso_train_loader = DataLoader(Subset(base_train_ds, pso_train_idx), batch_size=32, shuffle=True,  num_workers=4, pin_memory=True)
    pso_val_loader   = DataLoader(Subset(base_val_ds,   pso_val_idx),   batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

    # optimize
    start = time.time()
    opt = PSO(fitness_func=fitness_function_for_pso, bounds=hp_bounds, num_particles=15, max_iter=30, c1=2.0, c2=2.0, w=0.7)
    best_hps, best_score = opt.optimize(pso_train_loader, pso_val_loader, num_classes=len(class_names), device=device, patience=7, min_delta=0.005)
    print(f"\n--- PSO Finished in {time.time() - start:.2f}s ---")
    print("Best PSO HPs:", best_hps, " | Best score:", best_score)

    # final train (full loaders)
    pso_model = HybridBeitEffNet(dropout=best_hps['Dropout'], num_classes=len(class_names)).to(device)
    for p in pso_model.parameters(): p.requires_grad = True
    crit = nn.CrossEntropyLoss(label_smoothing=0.1)
    opti = optim.AdamW(pso_model.parameters(), lr=best_hps['LR']/10, weight_decay=best_hps['WD'])
    train_model(pso_model, train_loader, val_loader, crit, opti,
            device=device, num_epochs=50, patience=PATIENCE, model_name='pso_model')
    torch.save(pso_model.state_dict(), 'pso_model.pth')
    pso_model.load_state_dict(torch.load('pso_model.pth', map_location=device))
    pso_metrics = evaluate_model(pso_model, test_loader, crit, device, class_names, model_name="PSO Model")
    return best_hps, best_score, pso_metrics

In [None]:
from torch.utils.data import Subset
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from typing import Dict
import copy, time, random

def run_woa_optimization(train_loader, val_loader, test_loader, class_names, device):
    import copy, time, random
    # reuse the same class body as PSO (your code), just rename to WOA for now
    class WOA:
        def __init__(self, fitness_func, bounds, num_particles, max_iter, c1=2.0, c2=2.0, w=0.7):
            self.fitness_func = fitness_func
            if isinstance(bounds, dict):
                self.hp_keys = list(bounds.keys())
                self.bounds = [bounds[k] for k in self.hp_keys]
            else:
                self.bounds = bounds
                self.hp_keys = [f'HP_{i}' for i in range(len(bounds))]
            self.num_particles = num_particles
            self.max_iter = max_iter
            self.c1, self.c2, self.w = c1, c2, w
            self.dim = len(self.bounds)
            self.minb = np.array([b[0] for b in self.bounds])
            self.maxb = np.array([b[1] for b in self.bounds])
            self.pos = np.random.uniform(self.minb, self.maxb, (self.num_particles, self.dim))
            self.vel = np.random.uniform(-1, 1, (self.num_particles, self.dim))
            self.pbest_pos = self.pos.copy()
            self.pbest_score = np.full(self.num_particles, -np.inf)
            self.gbest_pos = None
            self.gbest_score = -np.inf
            self.best_hps = None
        def _eval(self, hp_dict, tr_loader, va_loader, num_classes, device):
            return self.fitness_func(hp_dict, tr_loader, va_loader, num_classes, device)
        def optimize(self, tr_loader, va_loader, num_classes, device, patience=7, min_delta=0.005):
            best_hist, epochs_no_improve = [], 0
            self.gbest_score, self.gbest_pos, self.best_hps = -np.inf, None, None
            print(f"\n--- Starting WOA: particles={self.num_particles}, iters={self.max_iter} ---")
            for it in range(self.max_iter):
                t0 = time.time()
                for i in range(self.num_particles):
                    hp = {
                        self.hp_keys[0]: self.pos[i, 0],
                        self.hp_keys[1]: self.pos[i, 1],
                        self.hp_keys[2]: self.pos[i, 2],
                        self.hp_keys[3]: int(self.pos[i, 3])
                    }
                    s = self._eval(hp, tr_loader, va_loader, num_classes, device)
                    if s > self.pbest_score[i]:
                        self.pbest_score[i] = s
                        self.pbest_pos[i] = self.pos[i].copy()
                    if s > self.gbest_score:
                        self.gbest_score = s
                        self.gbest_pos = self.pos[i].copy()
                        self.best_hps = copy.deepcopy(hp)
                for i in range(self.num_particles):
                    r1 = np.random.rand(self.dim); r2 = np.random.rand(self.dim)
                    cog = self.c1 * r1 * (self.pbest_pos[i] - self.pos[i])
                    soc = self.c2 * r2 * (self.gbest_pos    - self.pos[i])
                    self.vel[i] = self.w * self.vel[i] + cog + soc
                    max_step = (self.maxb - self.minb) * 0.1
                    self.vel[i] = np.clip(self.vel[i], -max_step, max_step)
                    self.pos[i] = np.clip(self.pos[i] + self.vel[i], self.minb, self.maxb)
                print(f"WOA Iter {it+1}/{self.max_iter} | gbest={self.gbest_score:.4f} | hps={self.best_hps}")
                if not best_hist or (self.gbest_score - max(best_hist) > min_delta):
                    best_hist.append(self.gbest_score); epochs_no_improve = 0
                else:
                    epochs_no_improve += 1
                if epochs_no_improve >= patience: 
                    print("Early stopping WOA."); break
            if self.best_hps is None:
                return {
                    self.hp_keys[0]: self.minb[0],
                    self.hp_keys[1]: self.minb[1],
                    self.hp_keys[2]: self.minb[2],
                    self.hp_keys[3]: int(self.minb[3]),
                }, 0.0
            return self.best_hps, self.gbest_score

    def fitness_function_for_woa(hps, tr_loader, va_loader, num_classes, device):
        lr, wd = hps['LR'], hps['WD']
        dropout = hps['Dropout']
        model = HybridBeitEfficientNet(dropout=dropout, num_classes=num_classes).to(device)
        for p in model.beit_backbone.parameters(): p.requires_grad = False
        for p in model.efficientnet_backbone.parameters(): p.requires_grad = False
        crit = nn.CrossEntropyLoss(label_smoothing=0.1)
        opt  = optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
        model.train()
        for _ in range(7):
            for x, y in tr_loader:
                x, y = x.to(device), y.to(device)
                opt.zero_grad(); loss = crit(model(x), y); loss.backward(); opt.step()
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for x, y in va_loader:
                x, y = x.to(device), y.to(device)
                total += y.size(0)
                correct += (model(x).argmax(1) == y).sum().item()
        return correct / total

    hp_bounds = {'LR': (1e-5, 1e-3), 'WD': (1e-5, 1e-3), 'Dropout': (0.1, 0.5), 'FC_Units': (256, 1024)}

    base_train_ds = train_loader.dataset
    base_val_ds   = val_loader.dataset
    frac = 0.5
    woa_train_idx = random.sample(range(len(base_train_ds)), int(len(base_train_ds)*frac))
    woa_val_idx   = random.sample(range(len(base_val_ds)),   int(len(base_val_ds)*frac))
    woa_train_loader = DataLoader(Subset(base_train_ds, woa_train_idx), batch_size=32, shuffle=True,  num_workers=4, pin_memory=True)
    woa_val_loader   = DataLoader(Subset(base_val_ds,   woa_val_idx),   batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

    start = time.time()
    opt = WOA(fitness_func=fitness_function_for_woa, bounds=hp_bounds, num_particles=15, max_iter=30, c1=2.0, c2=2.0, w=0.7)
    best_hps, best_score = opt.optimize(woa_train_loader, woa_val_loader, num_classes=len(class_names), device=device, patience=7, min_delta=0.005)
    print(f"\n--- WOA Finished in {time.time() - start:.2f}s ---")
    print("Best WOA HPs:", best_hps, " | Best score:", best_score)

    woa_model = HybridBeitEffNet(dropout=best_hps['Dropout'], num_classes=len(class_names)).to(device)
    for p in woa_model.parameters(): p.requires_grad = True
    crit = nn.CrossEntropyLoss(label_smoothing=0.1)
    opti = optim.AdamW(woa_model.parameters(), lr=best_hps['LR']/10, weight_decay=best_hps['WD'])
    train_model(woa_model, train_loader, val_loader, crit, opti,
            device=device, num_epochs=50, patience=PATIENCE, model_name='woa_model')
    torch.save(woa_model.state_dict(), 'woa_model.pth')
    woa_model.load_state_dict(torch.load('woa_model.pth', map_location=device))
    woa_metrics = evaluate_model(woa_model, test_loader, crit, device, class_names, model_name="WOA Model")
    return best_hps, best_score, woa_metrics

In [None]:
def main():
    # Step 1: Data Preparation
    run_step("copy")
    run_step("rename")
    run_step("preprocess")
    run_step("verify")
    run_step("display")
    run_step("split")

    # Load data
    train_loader, val_loader, test_loader, class_names = get_dataloaders(SPLIT_BASE)

    # Step 2: Baseline Model 
    print("\n BASE MODEL TRAINING")
    base_model = HybridBeitEffNet(dropout=BEST_DROPOUT, num_classes=len(class_names)).to(DEVICE)
    for p in base_model.beit.parameters():
        p.requires_grad = False
    for p in base_model.efficientnet.parameters():
        p.requires_grad = False

    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
    optimizer = optim.Adam(
        filter(lambda p: p.requires_grad, base_model.parameters()),
        lr=BEST_LR,
        weight_decay=WEIGHT_DECAY
    )

    train_model(
        base_model, train_loader, val_loader, criterion, optimizer,
        epochs=EPOCHS, device=DEVICE, patience=PATIENCE, save_path="base_model.pth"
    )
    base_model.load_state_dict(torch.load("base_model.pth", map_location=DEVICE))

    print("\n Base Model Evaluation")
    base_metrics = evaluate_model_full(
        base_model, train_loader, val_loader, test_loader,
        criterion, class_names, device=DEVICE, model_name="Base Model"
    )
    print(base_metrics)

    # Step 3: PSO Optimization 
    print("\n PSO OPTIMIZATION")
    best_pso_hps, best_pso_acc, pso_metrics = run_pso_optimization(
        train_loader, val_loader, test_loader, class_names, DEVICE
    )
    print(pso_metrics)

    # Step 4: WOA Optimization
    print("\n WOA OPTIMIZATION")
    best_woa_hps, best_woa_acc, woa_metrics = run_woa_optimization(
        train_loader, val_loader, test_loader, class_names, DEVICE
    )
    print(woa_metrics)

    # Step 5: Final Summary 
    print("\n FINAL SUMMARY")
    print("Base Model:", base_metrics)
    print("PSO Model :", pso_metrics)
    print("WOA Model :", woa_metrics)


if __name__ == "__main__":
    main()
