In [1]:
import os, json, time
from pathlib import Path

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# --------- CONFIG ---------
DATA_DIR = "dataset_split"            # <-- change me to your dataset root
MODEL_NAME = "efficientnet_b0"  # choices: efficientnet_b0, resnet50, resnet18
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 10
LR = 3e-4
UNFREEZE_AT = 3               # epoch to unfreeze backbone and fine-tune
NUM_WORKERS = 2
OUT_DIR = "artifacts"
SEED = 42

# Reproducibility
torch.manual_seed(SEED)
np.random.seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.makedirs(OUT_DIR, exist_ok=True)
print(f"Using device: {device}")

ModuleNotFoundError: No module named 'torch'

In [None]:
def get_transforms(img_size=224):
    mean = [0.485, 0.456, 0.406]
    std  = [0.229, 0.224, 0.225]
    train_tfms = transforms.Compose([
        transforms.Resize(int(img_size*1.15)),
        transforms.RandomResizedCrop(img_size, scale=(0.7, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])
    eval_tfms = transforms.Compose([
        transforms.Resize(int(img_size*1.15)),
        transforms.CenterCrop(img_size),
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])
    return train_tfms, eval_tfms

def build_loaders(data_dir, img_size, batch_size, workers):
    train_tfms, eval_tfms = get_transforms(img_size)
    train_ds = datasets.ImageFolder(Path(data_dir)/"train", transform=train_tfms)
    val_ds   = datasets.ImageFolder(Path(data_dir)/"val",   transform=eval_tfms)

    train_ld = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=workers, pin_memory=True)
    val_ld   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=True)
    return train_ds, val_ds, train_ld, val_ld

train_ds, val_ds, train_ld, val_ld = build_loaders(DATA_DIR, IMG_SIZE, BATCH_SIZE, NUM_WORKERS)
classes = train_ds.classes
print(f"Classes ({len(classes)}): {classes}")

# Save index mapping
with open(Path(OUT_DIR)/"idx_to_class.json","w") as f:
    json.dump({i:c for c,i in train_ds.class_to_idx.items()}, f, indent=2)


In [None]:
def build_model(model_name, num_classes, pretrained=True):
    name = model_name.lower()
    if name == "efficientnet_b0":
        m = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT if pretrained else None)
        in_features = m.classifier[1].in_features
        m.classifier[1] = nn.Linear(in_features, num_classes)
    elif name == "resnet50":
        m = models.resnet50(weights=models.ResNet50_Weights.DEFAULT if pretrained else None)
        in_features = m.fc.in_features
        m.fc = nn.Linear(in_features, num_classes)
    else:
        m = models.resnet18(weights=models.ResNet18_Weights.DEFAULT if pretrained else None)
        in_features = m.fc.in_features
        m.fc = nn.Linear(in_features, num_classes)
    return m

def freeze_backbone(model, model_name):
    for p in model.parameters():
        p.requires_grad = False
    if "efficientnet" in model_name:
        for p in model.classifier.parameters():
            p.requires_grad = True
    else:
        for p in model.fc.parameters():
            p.requires_grad = True

def unfreeze_all(model):
    for p in model.parameters():
        p.requires_grad = True

model = build_model(MODEL_NAME, num_classes=len(classes)).to(device)
freeze_backbone(model, MODEL_NAME)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR)
scheduler = optim.CosineAnnealingLR(optimizer, T_max=EPOCHS)
scaler = torch.cuda.amp.GradScaler(enabled=(device.type == "cuda"))

best_acc, best_path = 0.0, str(Path(OUT_DIR)/"best.pt")


In [None]:
def train_one_epoch(model, loader, criterion, optimizer, device, scaler=None):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, targets in loader:
        images, targets = images.to(device), targets.to(device)
        optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=scaler is not None):
            logits = model(images)
            loss = criterion(logits, targets)
        if scaler:
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            optimizer.step()
        running_loss += loss.item() * images.size(0)
        preds = logits.argmax(1)
        correct += (preds == targets).sum().item()
        total += targets.size(0)
    return running_loss/total, correct/total

@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    loss_sum, correct, total = 0.0, 0, 0
    all_preds, all_targets = [], []
    for images, targets in loader:
        images, targets = images.to(device), targets.to(device)
        logits = model(images)
        loss = criterion(logits, targets)
        loss_sum += loss.item() * images.size(0)
        preds = logits.argmax(1)
        correct += (preds == targets).sum().item()
        total += targets.size(0)
        all_preds.append(preds.cpu().numpy())
        all_targets.append(targets.cpu().numpy())
    avg_loss = loss_sum/total
    acc = correct/total
    import numpy as np
    y_pred = np.concatenate(all_preds) if all_preds else np.array([])
    y_true = np.concatenate(all_targets) if all_targets else np.array([])
    return avg_loss, acc, y_true, y_pred

history = {"train_loss":[], "train_acc":[], "val_loss":[], "val_acc":[]}

for epoch in range(1, EPOCHS+1):
    if epoch == UNFREEZE_AT:
        unfreeze_all(model)
        optimizer = optim.AdamW(model.parameters(), lr=LR*0.3)  # smaller LR for fine-tune
        print(f"Unfroze backbone at epoch {epoch}")

    t0 = time.time()
    tr_loss, tr_acc = train_one_epoch(model, train_ld, criterion, optimizer, device, scaler)
    va_loss, va_acc, y_true, y_pred = evaluate(model, val_ld, criterion, device)
    scheduler.step()

    history["train_loss"].append(tr_loss); history["train_acc"].append(tr_acc)
    history["val_loss"].append(va_loss);   history["val_acc"].append(va_acc)

    print(f"Epoch {epoch:02d}/{EPOCHS} | "
          f"train loss {tr_loss:.4f} acc {tr_acc:.3f} | "
          f"val loss {va_loss:.4f} acc {va_acc:.3f} | "
          f"{(time.time()-t0):.1f}s")

    if va_acc > best_acc:
        best_acc = va_acc
        torch.save({"model":model.state_dict(),
                    "model_name":MODEL_NAME,
                    "classes":classes,
                    "img_size":IMG_SIZE}, best_path)

print(f"\nBest val acc: {best_acc:.3f}. Saved to {best_path}")


In [None]:
epochs = range(1, len(history["train_acc"])+1)

plt.figure()
plt.plot(epochs, history["train_acc"], label="train_acc")
plt.plot(epochs, history["val_acc"], label="val_acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Accuracy"); plt.legend(); plt.show()

plt.figure()
plt.plot(epochs, history["train_loss"], label="train_loss")
plt.plot(epochs, history["val_loss"], label="val_loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss"); plt.legend(); plt.show()


In [None]:
# Reload best checkpoint for evaluation
ckpt = torch.load(best_path, map_location=device)
best_model = build_model(ckpt["model_name"], len(ckpt["classes"])).to(device)
best_model.load_state_dict(ckpt["model"])

val_loss, val_acc, y_true, y_pred = evaluate(best_model, val_ld, criterion, device)
print(f"Validation accuracy (best): {val_acc:.4f}\n")

# Detailed report
target_names = ckpt["classes"]
print(classification_report(y_true, y_pred, target_names=target_names))

# Confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(range(len(target_names))))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=target_names)
fig, ax = plt.subplots(figsize=(8, 8))
disp.plot(ax=ax, xticks_rotation=90, colorbar=False)
plt.tight_layout()
plt.show()


In [None]:
from PIL import Image

def get_infer_transform(img_size=224):
    mean=[0.485,0.456,0.406]; std=[0.229,0.224,0.225]
    return transforms.Compose([
        transforms.Resize(int(img_size*1.15)),
        transforms.CenterCrop(img_size),
        transforms.ToTensor(),
        transforms.Normalize(mean,std),
    ])

@torch.no_grad()
def predict_image(img_path, ckpt_path=best_path, device=device):
    ckpt = torch.load(ckpt_path, map_location=device)
    model = build_model(ckpt["model_name"], len(ckpt["classes"])).to(device)
    model.load_state_dict(ckpt["model"]); model.eval()
    tfm = get_infer_transform(ckpt["img_size"])
    img = Image.open(img_path).convert("RGB")
    x = tfm(img).unsqueeze(0).to(device)
    logits = model(x)
    probs = torch.softmax(logits, dim=1)[0]
    conf, idx = probs.max(0)
    return ckpt["classes"][idx.item()], float(conf.item())

# Example:
# pred, score = predict_image("path/to/your_image.jpg")
# print(pred, score)
