In [None]:
import os
import time
import copy
import random
import numpy as np
import cv2 as cv
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models
from torch.cuda.amp import autocast, GradScaler
from PIL import Image


In [None]:
DATA_DIR      = "/kaggle/input/brain-tumor-mri-dataset"
TRAIN_DIR     = os.path.join(DATA_DIR, "Training")
TEST_DIR      = os.path.join(DATA_DIR, "Testing")

NUM_CLASSES   = 4
IMG_SIZE      = 224
BATCH_SIZE    = 32
NUM_WORKERS   = 2

EPOCHS_STAGE1 = 50
LR_HEAD_S1    = 1e-4

WEIGHT_DECAY  = 1e-4
LABEL_SMOOTH  = 0.05
PATIENCE      = 5

SAVE_PATH     = "/kaggle/working/best_vgg16_mri_4class.pth"
USE_AMP       = True

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

In [None]:
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

# CLAHE tăng tương phản ảnh MRI
def clahe_pil(img: Image.Image, clipLimit=2.0, tileGridSize=(8, 8)) -> Image.Image:
    img_np = np.array(img)
    if img_np.ndim == 3:
        gray = cv.cvtColor(img_np, cv.COLOR_RGB2GRAY)
    else:
        gray = img_np
    gray = gray.astype(np.uint8)
    clahe = cv.createCLAHE(clipLimit=clipLimit, tileGridSize=tileGridSize)
    eq = clahe.apply(gray)
    eq_rgb = cv.cvtColor(eq, cv.COLOR_GRAY2RGB)
    return Image.fromarray(eq_rgb)

transform = transforms.Compose([
    clahe_pil,
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

train_set = datasets.ImageFolder(TRAIN_DIR, transform=transform)
test_set  = datasets.ImageFolder(TEST_DIR, transform=transform)

num_train = len(train_set)
val_ratio = 0.2
num_val = int(val_ratio * num_train)
num_train_actual = num_train - num_val

train_subset, val_subset = random_split(
    train_set,
    [num_train_actual, num_val],
    generator=torch.Generator().manual_seed(42)
)

train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)
test_loader  = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, pin_memory=True)

print("Classes:", train_set.classes)


In [None]:
def build_vgg16(num_classes=4):
    model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
    in_f = model.classifier[6].in_features
    model.classifier[6] = nn.Linear(in_f, num_classes)
    return model

model = build_vgg16(NUM_CLASSES).to(device)

In [None]:
def train_only_classifier6(model):
    for p in model.parameters():
        p.requires_grad = False
    for p in model.classifier[6].parameters():
        p.requires_grad = True

def count_trainable_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTH)

In [None]:
def train_one_epoch(model, loader, optimizer, scaler=None):
    model.train()
    running_loss, running_correct, total = 0.0, 0, 0

    for images, labels in loader:
        images = images.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        optimizer.zero_grad(set_to_none=True)

        if scaler is not None:
            with autocast():
                outputs = model(images)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        running_loss += loss.item() * images.size(0)
        preds = outputs.argmax(dim=1)
        running_correct += (preds == labels).sum().item()
        total += images.size(0)

    return running_loss / total, running_correct / total

In [None]:
def eval_one_epoch(model, loader):
    model.eval()
    running_loss, running_correct, total = 0.0, 0, 0

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)

            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            preds = outputs.argmax(dim=1)
            running_correct += (preds == labels).sum().item()
            total += images.size(0)

    return running_loss / total, running_correct / total

In [None]:
def fit(model, train_loader, val_loader, optimizer, scheduler=None, epochs=10, use_amp=True):
    scaler = GradScaler() if (use_amp and device == "cuda") else None

    best_val_loss = float("inf")
    best_wts = copy.deepcopy(model.state_dict())
    no_improve = 0

    for epoch in range(1, epochs + 1):
        t0 = time.time()

        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, scaler)
        va_loss, va_acc = eval_one_epoch(model, val_loader)

        if scheduler is not None:
            scheduler.step(va_loss)

        print(f"Epoch {epoch:02d}/{epochs} | "
              f"train_loss={tr_loss:.4f} train_acc={tr_acc:.4f} | "
              f"val_loss={va_loss:.4f} val_acc={va_acc:.4f} | "
              f"time={time.time()-t0:.1f}s")

        if va_loss < best_val_loss:
            best_val_loss = va_loss
            best_wts = copy.deepcopy(model.state_dict())
            no_improve = 0
            torch.save(best_wts, SAVE_PATH)
            print("  -> Saved best model")
        else:
            no_improve += 1
            if no_improve >= PATIENCE:
                print("  -> Early stopping")
                break

    model.load_state_dict(best_wts)
    return model

In [None]:
print("\n===== STAGE 1: Train ONLY classifier[6] (VGG16) =====")
train_only_classifier6(model)
print("Trainable params:", count_trainable_params(model))

optimizer = torch.optim.AdamW(
    model.classifier[6].parameters(),
    lr=LR_HEAD_S1,
    weight_decay=WEIGHT_DECAY
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=2
)

model = fit(
    model,
    train_loader,
    val_loader,
    optimizer,
    scheduler,
    epochs=EPOCHS_STAGE1,
    use_amp=USE_AMP
)

print("\nDone. Best model saved at:", SAVE_PATH)


In [None]:
model.load_state_dict(torch.load(SAVE_PATH))
model.eval()
print("Loaded best model from:", SAVE_PATH)


In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

y_true, y_pred = [], []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        preds = outputs.argmax(dim=1)

        y_pred.extend(preds.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(8,6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=train_set.classes,
    yticklabels=train_set.classes
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix - VGG16")
plt.show()

# Classification Report
print("\nClassification Report:\n")
print(classification_report(y_true, y_pred, target_names=train_set.classes))


In [None]:
def fit(model, train_loader, val_loader, optimizer, scheduler=None, epochs=10, use_amp=True):
    scaler = GradScaler() if (use_amp and device == "cuda") else None

    history = {
        "train_loss": [],
        "train_acc": [],
        "val_loss": [],
        "val_acc": []
    }

    best_val_loss = float("inf")
    best_wts = copy.deepcopy(model.state_dict())
    no_improve = 0

    for epoch in range(1, epochs + 1):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, scaler)
        va_loss, va_acc = eval_one_epoch(model, val_loader)

        if scheduler is not None:
            scheduler.step(va_loss)

        history["train_loss"].append(tr_loss)
        history["train_acc"].append(tr_acc)
        history["val_loss"].append(va_loss)
        history["val_acc"].append(va_acc)

        print(f"Epoch {epoch:02d}/{epochs} | "
              f"train_loss={tr_loss:.4f} train_acc={tr_acc:.4f} | "
              f"val_loss={va_loss:.4f} val_acc={va_acc:.4f}")

        if va_loss < best_val_loss:
            best_val_loss = va_loss
            best_wts = copy.deepcopy(model.state_dict())
            no_improve = 0
            torch.save(best_wts, SAVE_PATH)
        else:
            no_improve += 1
            if no_improve >= PATIENCE:
                print("Early stopping")
                break

    model.load_state_dict(best_wts)
    return model, history


In [None]:
model, history = fit(
    model,
    train_loader,
    val_loader,
    optimizer,
    scheduler,
    epochs=EPOCHS_STAGE1,
    use_amp=USE_AMP
)


In [None]:
train_acc  = history["train_acc"]
val_acc    = history["val_acc"]
train_loss = history["train_loss"]
val_loss   = history["val_loss"]

plt.figure(figsize=(12,5))

plt.subplot(1,2,1)
plt.plot(train_acc, label="Train Acc")
plt.plot(val_acc, label="Val Acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.title("Accuracy")

plt.subplot(1,2,2)
plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Val Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.title("Loss")

plt.tight_layout()
plt.show()
