In [3]:
import os, random, numpy as np, torch, torch.nn as nn, torch.optim as optim
from pathlib import Path
from PIL import Image
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torchvision.models import convnext_tiny, ConvNeXt_Tiny_Weights
from torch.utils.data import DataLoader
DATA_DIR = r"C:\Users\ASUS\OneDrive\Desktop\data_split"
BATCH_SIZE = 4                   # RTX 3050 safe limit
EPOCHS = 80
BASE_LR = 5e-5
WEIGHT_DECAY = 2e-4
PATIENCE = 12                     # early stop
GAMMA_FOCAL = 2.0
LABEL_SMOOTH = 0.1
USE_MIXUP = True
MIXUP_ALPHA = 0.2
IMG_SIZE = 192  # reduced from 224 for 4 GB VRAM
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)
SEED = 123
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
train_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE+32, IMG_SIZE+32)),
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(25),  # NEW
    transforms.GaussianBlur(3, sigma=(0.1, 2.0)),  # NEW
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.25),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])
# ---- DATA ----
train_ds = datasets.ImageFolder(Path(DATA_DIR) / "train", transform=train_tf)
val_ds   = datasets.ImageFolder(Path(DATA_DIR) / "val", transform=val_tf)
class_names = train_ds.classes
print("Classes:", class_names)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
dataloaders = {"train": train_loader, "val": val_loader}
from torch.optim.lr_scheduler import SequentialLR, LinearLR
from tqdm import tqdm

# ---- MODEL ----
model = convnext_tiny(weights=ConvNeXt_Tiny_Weights.IMAGENET1K_V1)
num_ftrs = model.classifier[2].in_features
model.classifier[2] = nn.Sequential(
    nn.Dropout(0.6),  # increased from 0.5 → helps prevent overfitting
    nn.Linear(num_ftrs, len(class_names))
)
model = model.to(device)

# ---- LOSS & OPTIMIZER ----
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=BASE_LR, weight_decay=WEIGHT_DECAY)
# ---- SCHEDULER (Warm-up + Cosine) ----
warmup = LinearLR(optimizer, start_factor=0.1, total_iters=5)
cosine = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS - 5)
scheduler = SequentialLR(optimizer, schedulers=[warmup, cosine], milestones=[5])

# ---- AMP (Automatic Mixed Precision) ----
scaler = torch.amp.GradScaler('cuda', enabled=(device.type == "cuda"))
# ---- TRAINING LOOP ----
best_val_loss = np.inf
epochs_no_improve = 0
history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    print("-"*40)

    for phase in ["train", "val"]:
        model.train() if phase == "train" else model.eval()
        running_loss, running_corrects, total = 0.0, 0, 0

        for inputs, labels in tqdm(dataloaders[phase], desc=f"{phase.upper()}"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad(set_to_none=True)

            with torch.set_grad_enabled(phase == "train"):
                with torch.amp.autocast('cuda', enabled=(device.type == "cuda")):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                if phase == "train":
                    scaler.scale(loss).backward()
                    scaler.step(optimizer)
                    scaler.update()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total += labels.size(0)

        epoch_loss = running_loss / total
        epoch_acc = running_corrects.double() / total
        history[f"{phase}_loss"].append(epoch_loss)
        history[f"{phase}_acc"].append(epoch_acc.item())

        print(f"{phase} Loss: {epoch_loss:.4f} | Acc: {epoch_acc:.4f}")

        if phase == "train":
            scheduler.step()

        if phase == "val":
            if epoch_loss < best_val_loss:
                best_val_loss = epoch_loss
                epochs_no_improve = 0
                torch.save(model.state_dict(), "best_convnext_tiny2.pth")
            else:
                epochs_no_improve += 1
                if epochs_no_improve >= PATIENCE:
                    print("\n Early stopping triggered.")
                    break
    if epochs_no_improve >= PATIENCE:
        break

print("\n Training complete! Best validation loss:", best_val_loss)
# ---- PLOTS ----
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(history["train_loss"], label="train")
plt.plot(history["val_loss"], label="val")
plt.title("Loss"); plt.legend()

plt.subplot(1,2,2)
plt.plot(history["train_acc"], label="train")
plt.plot(history["val_acc"], label="val")
plt.title("Accuracy"); plt.legend()
plt.tight_layout(); plt.show()

# ---- CONFUSION MATRIX ----
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for xb, yb in val_loader:
        xb = xb.to(device)
        preds = model(xb).argmax(1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
disp.plot(cmap="Blues", xticks_rotation=45)
plt.title("Confusion Matrix (Validation)")
plt.show()
# ============================================================
#   Ensemble + TTA Evaluation (Combine multiple checkpoints)
# ============================================================
import glob

# Collect all model checkpoints you want to ensemble.
# Example: save multiple runs as best_convnext_tiny_seed42.pth, _77.pth, _99.pth, etc.
MODEL_DIR = Path(".")
model_paths = sorted(glob.glob(str(MODEL_DIR / "best_convnext_tiny*.pth")))
print(f"Found {len(model_paths)} models:", model_paths)

# Load each model
def load_models(paths):
    models_list = []
    for p in paths:
        state_dict = torch.load(p, map_location=device)
        m = convnext_tiny(weights=None)
        num_ftrs = m.classifier[2].in_features

        # Detect which classifier structure the model used
        if any("classifier.2.1.weight" in k for k in state_dict.keys()):
            # model trained with dropout + linear
            m.classifier[2] = nn.Sequential(
                nn.Dropout(0.5),
                nn.Linear(num_ftrs, len(class_names))
            )
        else:
            # model trained with linear only
            m.classifier[2] = nn.Linear(num_ftrs, len(class_names))

        m.load_state_dict(state_dict, strict=False)
        m.eval().to(device)
        models_list.append(m)

    return models_list

models_list = load_models(model_paths)
print(f" Loaded {len(models_list)} ConvNeXt-Tiny models for ensemble.")

# Function: predict with ensemble + TTA
@torch.no_grad()
def ensemble_tta_predict(models, img, transforms_list):
    total_probs = torch.zeros(len(class_names), device=device)
    for model in models:
        tta_probs = []
        for t in transforms_list:
            x = t(img).unsqueeze(0).to(device)
            logits = model(x)
            tta_probs.append(torch.softmax(logits, dim=1))
        total_probs += torch.stack(tta_probs).mean(0).squeeze(0)
    return (total_probs / len(models)).cpu()
from torchvision import transforms

IMG_SIZE = 192  # make sure this matches your model input size

# TTA (Test-Time Augmentation) transforms
tta_transforms = [
    transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]),
    transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(p=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]),
    transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ]),
]
# Example usage
img_path = r"C:\Users\ASUS\OneDrive\Desktop\test_dataset\rectangle\rectangle 9.png"
img = Image.open(img_path).convert("RGB")

probs = ensemble_tta_predict(models_list, img, tta_transforms)
pred_class = class_names[probs.argmax().item()]
confidence = float(probs.max().item())

print(f" Ensemble+TTA Predicted Class: {pred_class} ({confidence*100:.2f}% confidence)")


ModuleNotFoundError: No module named 'torch'