In [None]:
# traffic classifier model creation (3 classes: -1=no light, 0=red, 1=green)
import os
import random
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms, models
from sklearn.model_selection import train_test_split

# Config
IMAGES_PATH = "../output/traffic_lights_images.npy"
LABELS_PATH = "../output/traffic_lights_label.npy"
SAVE_DIR = "../Models"
EPOCHS = 10
BATCH_SIZE = 32
LR = 3e-5
VAL_SPLIT = 0.2
RESIZE = 224
SEED = 42
USE_PRETRAINED = True
MEAN = (0.485, 0.456, 0.406)
STD = (0.229, 0.224, 0.225)

# Label mapping: {-1, 0, 1} -> {0, 1, 2} for training; decode back for output
LABEL_MAP = {-1: 0, 0: 1, 1: 2}
INV_LABEL_MAP = {v: k for k, v in LABEL_MAP.items()}
NUM_CLASSES = len(LABEL_MAP)

# Seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

# Dataset loader for .npy files
class NpyImageDataset(Dataset):
    def __init__(self, images_path, labels_path, transform=None, label_map=None):
        self.images = np.load(images_path, mmap_mode='r')
        self.labels = np.load(labels_path, mmap_mode='r')
        self.transform = transform
        self.label_map = label_map or (lambda x: x)
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        arr = self.images[idx]
        if arr.dtype != np.uint8:
            arr = np.clip(arr, 0, 255).astype(np.uint8)
        img = transforms.functional.to_pil_image(arr)
        x = self.transform(img) if self.transform else transforms.ToTensor()(img)

        raw_label = int(self.labels[idx])
        if raw_label not in LABEL_MAP:
            raise ValueError(f"Unexpected label {raw_label}; expected one of {list(LABEL_MAP.keys())}")
        y = LABEL_MAP[raw_label]  # map to {0,1,2}
        return x, y

# Build ResNet18
def build_model(num_classes=NUM_CLASSES, pretrained=True):
    try:
        weights = models.ResNet18_Weights.DEFAULT if pretrained else None
        model = models.resnet18(weights=weights)
    except:
        model = models.resnet18(pretrained=pretrained)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

# Evaluate helper
def evaluate(model, loader, device):
    model.eval()
    losses, correct, total = [], 0, 0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            losses.append(F.cross_entropy(logits, yb).item())
            correct += (logits.argmax(1) == yb).sum().item()
            total += yb.size(0)
    return float(np.mean(losses) if losses else 0.0), correct / max(total, 1)

# Training loop
def main():
    set_seed(SEED)
    if not Path(IMAGES_PATH).exists() or not Path(LABELS_PATH).exists():
        raise FileNotFoundError("Check IMAGES_PATH and LABELS_PATH")

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    print(f"Using device: {device}")  # <-- Added print here
    if device.type == 'cuda':
        print(f"Using device: {torch.cuda.get_device_name(0)} (CUDA)")
    else:
        print("Using device: CPU")

    train_tfms = transforms.Compose([
        transforms.Resize((RESIZE, RESIZE)),
        transforms.ColorJitter(0.2, 0.2, 0.2, 0.02),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(MEAN, STD),
    ])
    val_tfms = transforms.Compose([
        transforms.Resize((RESIZE, RESIZE)),
        transforms.ToTensor(),
        transforms.Normalize(MEAN, STD),
    ])

    # Split data (stratify on *mapped* labels)
    full_ds = NpyImageDataset(IMAGES_PATH, LABELS_PATH)
    raw_labels = np.array([int(x) for x in full_ds.labels])
    if not np.all(np.isin(raw_labels, list(LABEL_MAP.keys()))):
        bad = raw_labels[~np.isin(raw_labels, list(LABEL_MAP.keys()))]
        raise ValueError(f"Found unexpected labels: {np.unique(bad)}; expected {list(LABEL_MAP.keys())}")
    mapped_labels = np.vectorize(LABEL_MAP.get)(raw_labels)

    idx_train, idx_val = train_test_split(
        np.arange(len(full_ds)), test_size=VAL_SPLIT, stratify=mapped_labels, random_state=SEED
    )
    train_ds = Subset(NpyImageDataset(IMAGES_PATH, LABELS_PATH, transform=train_tfms), idx_train)
    val_ds = Subset(NpyImageDataset(IMAGES_PATH, LABELS_PATH, transform=val_tfms), idx_val)

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=(device.type=='cuda'))
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, num_workers=2, pin_memory=(device.type=='cuda'))

    model = build_model(pretrained=USE_PRETRAINED).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
    scaler = torch.cuda.amp.GradScaler(enabled=(device.type == 'cuda'))

    best_acc = 0.0
    os.makedirs(SAVE_DIR, exist_ok=True)
    best_path = os.path.join(SAVE_DIR, 'traffic_lights_model.pt')

    for epoch in range(1, EPOCHS+1):
        model.train()
        losses, correct, total = [], 0, 0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad(set_to_none=True)
            with torch.cuda.amp.autocast(enabled=(device.type == 'cuda')):
                logits = model(xb)
                loss = F.cross_entropy(logits, yb)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            losses.append(loss.item())
            correct += (logits.argmax(1) == yb).sum().item()
            total += yb.size(0)

        val_loss, val_acc = evaluate(model, val_loader, device)
        print(f"Epoch {epoch:02d} | Train Acc {correct/max(total,1):.4f} | Val Acc {val_acc:.4f} | Val Loss {val_loss:.4f}")
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save({'model_state_dict': model.state_dict()}, best_path)
            print(f"  Saved new best model (val_acc={best_acc:.4f})")

    print(f"Training done. Best val_acc={best_acc:.4f}")

# Quick prediction helper (returns -1, 0, or 1)
def predict(image_array, weights_path):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = build_model(pretrained=False).to(device)
    state = torch.load(weights_path, map_location=device)
    model.load_state_dict(state['model_state_dict'])
    model.eval()

    if image_array.dtype != np.uint8:
        image_array = np.clip(image_array, 0, 255).astype(np.uint8)
    img = transforms.functional.to_pil_image(image_array)
    tfm = transforms.Compose([
        transforms.Resize((RESIZE, RESIZE)),
        transforms.ToTensor(),
        transforms.Normalize(MEAN, STD),
    ])
    x = tfm(img).unsqueeze(0).to(device)
    with torch.no_grad():
        logits = model(x)
        mapped_pred = int(logits.argmax(1).item())   # in {0,1,2}
        pred = int(INV_LABEL_MAP[mapped_pred])       # back to {-1,0,1}
    return pred

if __name__ == "__main__":
    main()


Using device: cuda
Using device: NVIDIA GeForce RTX 4060 Laptop GPU (CUDA)


  scaler = torch.cuda.amp.GradScaler(enabled=(device.type == 'cuda'))
