In [None]:

from google.colab import drive
drive.mount('/content/drive')
import os, glob, random

SOURCE = "/content/drive/MyDrive/archive (1)/BreaKHis_v1/BreaKHis_v1/histology_slides/breast"
TARGET = "/content/BreakHis_8class_fast"

subtypes = {
    "adenosis": "ADENOSIS",
    "fibroadenoma": "FIBROADENOMA",
    "phyllodes_tumor": "PHYLLODES_TUMOR",
    "tubular_adenoma": "TUBULAR_ADENOMA",
    "ductal_carcinoma": "DUCTAL_CARCINOMA",
    "lobular_carcinoma": "LOBULAR_CARCINOMA",
    "mucinous_carcinoma": "MUCINOUS_CARCINOMA",
    "papillary_carcinoma": "PAPILLARY_CARCINOMA"
}

splits = ["train", "val", "test"]
for split in splits:
    for label in subtypes.values():
        os.makedirs(f"{TARGET}/{split}/{label}", exist_ok=True)

exts = (".png",".jpg",".jpeg",".tif",".tiff",".bmp")
subtype_files = {label: [] for label in subtypes.values()}

for root, dirs, files in os.walk(SOURCE):
    for file in files:
        name = file.lower()
        for key, label in subtypes.items():
            if key in root.lower():
                subtype_files[label].append(os.path.join(root, file))

def fast_symlink(paths, cls):
    random.shuffle(paths)
    n = len(paths)
    n_train = int(0.7 * n)
    n_val = int(0.15 * n)

    train, val, test = paths[:n_train], paths[n_train:n_train+n_val], paths[n_train+n_val:]

    for f in train: os.symlink(f, f"{TARGET}/train/{cls}/{os.path.basename(f)}")
    for f in val:   os.symlink(f, f"{TARGET}/val/{cls}/{os.path.basename(f)}")
    for f in test:  os.symlink(f, f"{TARGET}/test/{cls}/{os.path.basename(f)}")

    print(f"{cls}: train={len(train)}, val={len(val)}, test={len(test)}")

for cls, flist in subtype_files.items():
    fast_symlink(flist, cls)

print("\n✅ 8-class dataset ready at:", TARGET)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
ADENOSIS: train=310, val=66, test=68
FIBROADENOMA: train=709, val=152, test=153
PHYLLODES_TUMOR: train=317, val=67, test=69
TUBULAR_ADENOMA: train=398, val=85, test=86
DUCTAL_CARCINOMA: train=2415, val=517, test=519
LOBULAR_CARCINOMA: train=438, val=93, test=95
MUCINOUS_CARCINOMA: train=554, val=118, test=120
PAPILLARY_CARCINOMA: train=392, val=84, test=84

✅ 8-class dataset ready at: /content/BreakHis_8class_fast


In [None]:
import shutil, os

TARGET = "/content/BreakHis_8class_fast"

if os.path.exists(TARGET):
    shutil.rmtree(TARGET)

print("Old dataset removed.")


Old dataset removed.


In [None]:


import os, random, copy
import numpy as np
from tqdm import tqdm
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from torch.amp import autocast, GradScaler

from sklearn.metrics import classification_report, confusion_matrix, f1_score
import matplotlib.pyplot as plt
import seaborn as sns


DATA_ROOT   = "/content/BreakHis_8class_fast"
IMG_SIZE    = 456
BATCH_SIZE  = 24
EPOCHS      = 20
BASE_LR     = 8e-4
PATIENCE    = 5
NUM_WORKERS = 2
SEED        = 42

USE_EMA        = True
MIXUP_ALPHA    = 0.15
LABEL_SMOOTH   = 0.03
USE_TTA        = True
MAX_GRAD_NORM  = 3.0


random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
non_block = torch.cuda.is_available()

if torch.cuda.is_available():
    torch.backends.cudnn.benchmark = True
    torch.set_float32_matmul_precision("high")

train_tf = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.75, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(0.25, 0.25, 0.25),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

test_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

train_set = datasets.ImageFolder(f"{DATA_ROOT}/train", transform=train_tf)
val_set   = datasets.ImageFolder(f"{DATA_ROOT}/val",   transform=test_tf)
test_set  = datasets.ImageFolder(f"{DATA_ROOT}/test",  transform=test_tf)

classes = train_set.classes
num_classes = len(classes)

g = torch.Generator().manual_seed(SEED)
pin = torch.cuda.is_available()

train_loader = DataLoader(
    train_set, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=NUM_WORKERS, pin_memory=pin,
    persistent_workers=NUM_WORKERS > 0, generator=g
)

val_loader = DataLoader(
    val_set, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=pin,
    persistent_workers=NUM_WORKERS > 0
)

test_loader = DataLoader(
    test_set, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, pin_memory=pin,
    persistent_workers=NUM_WORKERS > 0
)


counts = np.zeros(num_classes, dtype=np.int64)
for _, y in train_set.samples:
    counts[y] += 1

w = 1.0 / np.maximum(counts, 1)
w = w / w.sum() * num_classes
class_weights = torch.tensor(w, dtype=torch.float32, device=device)


model = models.efficientnet_b5(
    weights=models.EfficientNet_B5_Weights.IMAGENET1K_V1
)

for p in model.parameters():
    p.requires_grad = False
for p in model.features[-1].parameters():
    p.requires_grad = True

in_feats = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Linear(in_feats, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(0.35),
    nn.Linear(512, num_classes)
)

model = model.to(device)
if torch.cuda.is_available():
    model = model.to(memory_format=torch.channels_last)


class EMA:
    def __init__(self, model, decay=0.999):
        self.decay = decay
        self.model = copy.deepcopy(model).eval()
        for p in self.model.parameters():
            p.requires_grad = False

    @torch.no_grad()
    def update(self, model):
        for ema_p, p in zip(self.model.parameters(), model.parameters()):
            ema_p.data.mul_(self.decay).add_(p.data, alpha=1 - self.decay)

ema = EMA(model) if USE_EMA else None


criterion = nn.CrossEntropyLoss(
    weight=class_weights,
    label_smoothing=LABEL_SMOOTH
)

params = list(model.classifier.parameters()) + \
         list(model.features[-1].parameters())

optimizer = optim.AdamW(params, lr=BASE_LR, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=4)

scaler = GradScaler(enabled=torch.cuda.is_available())

def mixup(x, y, alpha):
    lam = np.random.beta(alpha, alpha)
    idx = torch.randperm(x.size(0), device=x.device)
    return lam * x + (1 - lam) * x[idx], (y, y[idx]), lam

def mixup_loss(pred, y, lam):
    y1, y2 = y
    return lam * criterion(pred, y1) + (1 - lam) * criterion(pred, y2)


def evaluate(loader, model):
    model.eval()
    total, correct, n = 0.0, 0, 0
    y_true, y_pred = [], []

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, non_blocking=True)
            y = y.to(device, non_blocking=True)

            if torch.cuda.is_available():
                x = x.to(memory_format=torch.channels_last)

            with autocast("cuda", enabled=torch.cuda.is_available()):
                out = model(x)
                loss = criterion(out, y)

            total += loss.item()
            pred = out.argmax(1)

            correct += (pred == y).sum().item()
            n += y.size(0)
            y_true.extend(y.cpu().tolist())
            y_pred.extend(pred.cpu().tolist())

    return (
        total / len(loader),
        correct / n,
        f1_score(y_true, y_pred, average="macro"),
        y_true,
        y_pred
    )

best_f1 = -1.0
patience = 0

for epoch in range(EPOCHS):
    model.train()
    running = 0.0

    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for step, (x, y) in enumerate(pbar):
        x = x.to(device, non_blocking=True)
        y = y.to(device, non_blocking=True)

        if torch.cuda.is_available():
            x = x.to(memory_format=torch.channels_last)

        optimizer.zero_grad(set_to_none=True)

        with autocast("cuda", enabled=torch.cuda.is_available()):
            if MIXUP_ALPHA > 0:
                xm, ym, lam = mixup(x, y, MIXUP_ALPHA)
                out = model(xm)
                loss = mixup_loss(out, ym, lam)
            else:
                out = model(x)
                loss = criterion(out, y)

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(params, MAX_GRAD_NORM)
        scaler.step(optimizer)
        scaler.update()

        if USE_EMA:
            ema.update(model)

        scheduler.step(epoch + step / len(train_loader))
        running += loss.item()
        pbar.set_postfix(loss=loss.item())

    train_loss = running / len(train_loader)
    val_loss, val_acc, val_f1, _, _ = evaluate(
        val_loader, ema.model if USE_EMA else model
    )

    print(
        f"Train {train_loss:.4f} | "
        f"Val {val_loss:.4f} | "
        f"Acc {val_acc:.4f} | "
        f"F1 {val_f1:.4f}"
    )

    if val_f1 > best_f1:
        best_f1 = val_f1
        patience = 0
        torch.save(
            (ema.model if USE_EMA else model).state_dict(),
            "best_breakhis_effb5_fast.pth"
        )
        print("✓ Saved best model")
    else:
        patience += 1
        if patience >= PATIENCE:
            print("Early stopping")
            break


ckpt = torch.load("best_breakhis_effb5_fast.pth", map_location=device)

if USE_EMA:
    ema.model.load_state_dict(ckpt)
    test_model = ema.model
else:
    model.load_state_dict(ckpt)
    test_model = model

test_loss, test_acc, test_f1, y_true, y_pred = evaluate(test_loader, test_model)

print(f"\nTest | Loss {test_loss:.4f} | Acc {test_acc:.4f} | F1 {test_f1:.4f}")
print(classification_report(y_true, y_pred, target_names=classes, digits=4))

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt="d",
            xticklabels=classes, yticklabels=classes,
            cmap="Blues")
plt.tight_layout()
plt.show()


tta_transforms = [
    test_tf,
    transforms.Compose([transforms.RandomHorizontalFlip(p=1.0), *test_tf.transforms]),
    transforms.Compose([transforms.RandomVerticalFlip(p=1.0), *test_tf.transforms]),
]

def predict_image(path):
    img = Image.open(path).convert("RGB")
    test_model.eval()
    probs = 0

    with torch.no_grad():
        for tfm in tta_transforms:
            x = tfm(img).unsqueeze(0).to(device)
            if torch.cuda.is_available():
                x = x.to(memory_format=torch.channels_last)
            probs += torch.softmax(test_model(x), dim=1)

    probs /= len(tta_transforms)
    return classes[probs.argmax(1).item()]


IMNET_MEAN = torch.tensor([0.485, 0.456, 0.406]).view(3,1,1)
IMNET_STD  = torch.tensor([0.229, 0.224, 0.225]).view(3,1,1)

def denorm(x):
    return (x * IMNET_STD + IMNET_MEAN).clamp(0,1)

class GradCAM:
    def __init__(self, model, layer):
        self.grad = None
        self.act = None
        layer.register_forward_hook(self._fwd)
        layer.register_backward_hook(self._bwd)  # safer fallback

    def _fwd(self, m, i, o):
        self.act = o

    def _bwd(self, m, gi, go):
        self.grad = go[0]

    def generate(self, idx=0):
        w = self.grad[idx].mean(dim=(1,2), keepdim=True)
        cam = (w * self.act[idx]).sum(0).clamp(min=0)
        cam /= cam.max() + 1e-8
        return cam.cpu().numpy()

target_layer = test_model.features[-1]
cam = GradCAM(test_model, target_layer)

img_tensor, _ = test_set[0]
x = img_tensor.unsqueeze(0).to(device)

test_model.zero_grad()
out = test_model(x)
pred = out.argmax(1).item()
out[0, pred].backward()

heat = cam.generate(0)

plt.imshow(denorm(img_tensor).permute(1,2,0))
plt.imshow(heat, cmap="jet", alpha=0.45)
plt.title(f"Grad-CAM: {classes[pred]}")
plt.axis("off")
plt.show()


Epoch 1/20:   1%|▏         | 3/231 [01:44<2:10:46, 34.41s/it, loss=2.13]