maxvit training

In [None]:
import os
import shutil
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import timm
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

class CFG:
    dataset_dir = "/content/drive/rdd-2022/RDD_SPLIT"
    work_dir = "/content/drive/working/rdd2022-class"
    img_size = 224
    batch_size = 32
    epochs = 50
    lr = 1e-4
    patience = 7
    num_workers = 2
    device = "cuda" if torch.cuda.is_available() else "cpu"
    class_map = {
        0: "longitudinal_crack",
        1: "transverse_crack",
        2: "alligator_crack",
        3: "other_damage",
        4: "pothole"
    }

print(f"Using device: {CFG.device}")

def create_classification_folders(root, split):
    img_dir = os.path.join(root, split, "images")
    lbl_dir = os.path.join(root, split, "labels")
    new_root = os.path.join(CFG.work_dir, split)
    os.makedirs(new_root, exist_ok=True)
    print(f"Reorganizing {split} set...")
    for img_name in tqdm(os.listdir(img_dir)):
        if not img_name.endswith((".jpg", ".png", ".jpeg")):
            continue
        img_path = os.path.join(img_dir, img_name)
        lbl_path = os.path.join(lbl_dir, os.path.splitext(img_name)[0] + ".txt")
        if not os.path.exists(lbl_path):
            continue
        with open(lbl_path, "r") as f:
            lines = f.readlines()
            if not lines:
                continue
            cls_id = int(lines[0].split()[0])
        cls_name = CFG.class_map.get(cls_id, "unknown")
        cls_folder = os.path.join(new_root, cls_name)
        os.makedirs(cls_folder, exist_ok=True)
        shutil.copy(img_path, os.path.join(cls_folder, img_name))
    print(f"{split} set ready at {new_root}")

create_classification_folders(CFG.dataset_dir, "train")
create_classification_folders(CFG.dataset_dir, "val")

train_transforms = transforms.Compose([
    transforms.Resize((CFG.img_size, CFG.img_size)),
    transforms.RandomResizedCrop(CFG.img_size, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

val_transforms = transforms.Compose([
    transforms.Resize((CFG.img_size, CFG.img_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

train_ds = datasets.ImageFolder(os.path.join(CFG.work_dir, "train"), transform=train_transforms)
val_ds = datasets.ImageFolder(os.path.join(CFG.work_dir, "val"), transform=val_transforms)

train_loader = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True, num_workers=CFG.num_workers, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=CFG.batch_size, shuffle=False, num_workers=CFG.num_workers, pin_memory=True)

print(f"Dataset Loaded | Classes: {train_ds.classes}")

model = timm.create_model("maxvit_tiny_tf_224.in1k", pretrained=True, num_classes=len(train_ds.classes))
model = model.to(CFG.device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=CFG.lr, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG.epochs)
scaler = torch.cuda.amp.GradScaler()

def train_one_epoch():
    model.train()
    total_loss, correct, total = 0, 0, 0
    loop = tqdm(train_loader, desc="Train", leave=False)
    for imgs, labels in loop:
        imgs, labels = imgs.to(CFG.device), labels.to(CFG.device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(imgs)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item() * imgs.size(0)
        _, preds = outputs.max(1)
        correct += preds.eq(labels).sum().item()
        total += labels.size(0)
    return total_loss / total, 100. * correct / total

def validate():
    model.eval()
    total_loss, correct, total = 0, 0, 0
    preds_all, labels_all = [], []
    with torch.no_grad():
        for imgs, labels in tqdm(val_loader, desc="Valid", leave=False):
            imgs, labels = imgs.to(CFG.device), labels.to(CFG.device)
            with torch.cuda.amp.autocast():
                outputs = model(imgs)
                loss = criterion(outputs, labels)
            total_loss += loss.item() * imgs.size(0)
            _, preds = outputs.max(1)
            correct += preds.eq(labels).sum().item()
            total += labels.size(0)
            preds_all.extend(preds.cpu().numpy())
            labels_all.extend(labels.cpu().numpy())
    acc = 100. * correct / total
    return total_loss / total, acc, preds_all, labels_all

best_acc = 0
patience_counter = 0

for epoch in range(CFG.epochs):
    train_loss, train_acc = train_one_epoch()
    val_loss, val_acc, preds, labels = validate()
    scheduler.step()
    print(f"Epoch [{epoch+1}/{CFG.epochs}] Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")
    if val_acc > best_acc:
        best_acc = val_acc
        patience_counter = 0
        torch.save(model.state_dict(), "best_maxvit_rdd2022.pth")
        print(f"New best model saved (Val Acc: {best_acc:.2f}%)")
    else:
        patience_counter += 1
    if patience_counter >= CFG.patience:
        print(f"Early stopping triggered at epoch {epoch+1}")
        break

print("Training Complete.")
print(f"Best Validation Accuracy: {best_acc:.2f}%")

model.load_state_dict(torch.load("best_maxvit_rdd2022.pth"))
model.eval()
_, final_acc, preds, labels = validate()
print(f"Final Validation Accuracy: {final_acc:.2f}%")
print("Classification Report:")
print(classification_report(labels, preds, target_names=train_ds.classes))
print("Confusion Matrix:")
print(confusion_matrix(labels, preds))


swin training

In [None]:
import os, torch, timm
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import pandas as pd
import numpy as np

train_img_dir = "/content/drive/rdd-2022/RDD_SPLIT/train/images"
train_label_dir = "/content/drive/rdd-2022/RDD_SPLIT/train/labels"
val_img_dir = "/content/drive/rdd-2022/RDD_SPLIT/val/images"
val_label_dir = "/content/drive/rdd-2022/RDD_SPLIT/val/labels"

device = 'cuda' if torch.cuda.is_available() else 'cpu'
batch_size = 16
epochs = 50
lr = 1e-4
patience = 8

def load_yolo_labels(img_dir, label_dir):
    paths, labels = [], []
    for img_file in os.listdir(img_dir):
        if img_file.endswith(".jpg"):
            paths.append(os.path.join(img_dir, img_file))
            lbl_file = os.path.join(label_dir, img_file.replace(".jpg", ".txt"))
            if os.path.exists(lbl_file):
                with open(lbl_file, "r") as f:
                    lines = f.readlines()
                    labels.append(int(lines[0].split()[0]) if lines else 0)
            else:
                labels.append(0)
    return paths, labels

train_paths, train_labels = load_yolo_labels(train_img_dir, train_label_dir)
val_paths, val_labels = load_yolo_labels(val_img_dir, val_label_dir)
num_classes = max(train_labels + val_labels) + 1

print(f"Dataset loaded {len(train_paths)} train, {len(val_paths)} val, {num_classes} classes")

transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

transform_val = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

class CustomDataset(Dataset):
    def __init__(self, paths, labels, transform):
        self.paths = paths
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert("RGB")
        lbl = self.labels[idx]
        if self.transform:
            img = self.transform(img)
        return img, lbl

train_loader = DataLoader(CustomDataset(train_paths, train_labels, transform_train),
                          batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(CustomDataset(val_paths, val_labels, transform_val),
                        batch_size=batch_size, shuffle=False, num_workers=2)

class SwinWithDropout(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = timm.create_model('swin_base_patch4_window7_224', pretrained=True, num_classes=0)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(self.backbone.num_features, num_classes)
    def forward(self, x):
        x = self.backbone(x)
        x = self.dropout(x)
        return self.fc(x)

model = SwinWithDropout(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3)

best_acc = 0.0
wait = 0
train_acc_list, val_acc_list, train_loss_list, val_loss_list = [], [], [], []

for epoch in range(epochs):
    model.train()
    train_loss, correct, total = 0, 0, 0
    for imgs, lbls in tqdm(train_loader, desc=f"Epoch [{epoch+1}/{epochs}]"):
        imgs, lbls = imgs.to(device), lbls.to(device)
        optimizer.zero_grad()
        out = model(imgs)
        loss = criterion(out, lbls)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * imgs.size(0)
        _, preds = out.max(1)
        correct += preds.eq(lbls).sum().item()
        total += lbls.size(0)

    train_acc = 100 * correct / total
    train_loss /= total

    model.eval()
    val_loss, val_correct, val_total = 0, 0, 0
    with torch.no_grad():
        for imgs, lbls in val_loader:
            imgs, lbls = imgs.to(device), lbls.to(device)
            out = model(imgs)
            loss = criterion(out, lbls)
            val_loss += loss.item() * imgs.size(0)
            _, preds = out.max(1)
            val_correct += preds.eq(lbls).sum().item()
            val_total += lbls.size(0)

    val_acc = 100 * val_correct / val_total
    val_loss /= val_total
    scheduler.step(val_acc)

    train_acc_list.append(train_acc)
    val_acc_list.append(val_acc)
    train_loss_list.append(train_loss)
    val_loss_list.append(val_loss)

    print(f"\nEpoch {epoch+1}/{epochs}")
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
    print(f"Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.2f}%")

    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), "/content/drive/working/best_swin_transformer_full.pth")
        print("Best model saved.")
        wait = 0
    else:
        wait += 1
        print(f"No improvement for {wait} epoch(s)")
        if wait >= patience:
            print("Early stopping triggered.")
            break

print(f"Training complete : Best Validation Accuracy: {best_acc:.2f}%")

history = pd.DataFrame({
    "Epoch": range(1, len(train_acc_list)+1),
    "Train_Acc": train_acc_list,
    "Val_Acc": val_acc_list,
    "Train_Loss": train_loss_list,
    "Val_Loss": val_loss_list
})
