In [16]:
# Run in terminal or in a notebook cell prefixing ! if desired
!pip install torch torchvision timm opencv-python matplotlib scikit-learn grad-cam




In [17]:
import os, random, cv2, numpy as np, matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import timm
from pytorch_grad_cam import GradCAM, GradCAMPlusPlus, ScoreCAM
from pytorch_grad_cam import GradCAM, GradCAMPlusPlus, ScoreCAM
from pytorch_grad_cam.utils.image import preprocess_image, show_cam_on_image


IMG_SIZE = 224
mean = [0.485, 0.456, 0.406]; std = [0.229, 0.224, 0.225]
DATA_DIR = "trafficnet_dataset_v1"   # where zip extracted
RESULT_DIR = "results/cams"
os.makedirs(RESULT_DIR, exist_ok=True)


In [18]:
# transforms
train_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.1,0.1,0.1,0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])
eval_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

train_folder = os.path.join(DATA_DIR, "train")
test_folder = os.path.join(DATA_DIR, "test")

train_ds = datasets.ImageFolder(train_folder, transform=train_tf)
test_ds  = datasets.ImageFolder(test_folder, transform=eval_tf)

batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

print("Classes:", train_ds.classes)
print("Train / Test sizes:", len(train_ds), len(test_ds))


Classes: ['accident', 'dense_traffic', 'fire', 'sparse_traffic']
Train / Test sizes: 3600 800


In [19]:
num_classes = len(train_ds.classes)
model = timm.create_model('resnet50', pretrained=True, num_classes=num_classes)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(DEVICE)


In [20]:
do_train = True  # set False to skip training and use pretrained head (less accurate)
if do_train:
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    epochs = 3
    best_val_acc = 0.0

    for epoch in range(epochs):
        model.train()
        total, correct = 0, 0
        for xb, yb in train_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            optimizer.zero_grad()
            out = model(xb)
            loss = criterion(out, yb)
            loss.backward()
            optimizer.step()
            preds = out.argmax(dim=1)
            total += yb.size(0); correct += (preds==yb).sum().item()
        train_acc = correct/total
        print(f"Epoch {epoch+1}/{epochs} — train_acc: {train_acc:.3f}")

    # Save checkpoint
    ckpt = "models/resnet50_traffic.pth"
    os.makedirs(os.path.dirname(ckpt), exist_ok=True)
    torch.save(model.state_dict(), ckpt)
    print("Saved model to", ckpt)
else:
    print("Skipping training; using model as-is (pretrained head).")




Epoch 1/3 — train_acc: 0.686
Epoch 2/3 — train_acc: 0.896
Epoch 3/3 — train_acc: 0.933
Saved model to models/resnet50_traffic.pth


In [21]:
from sklearn.metrics import confusion_matrix, classification_report
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        out = model(xb)
        preds = out.argmax(dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

print(classification_report(all_labels, all_preds, target_names=test_ds.classes))
cm = confusion_matrix(all_labels, all_preds)
print("Confusion matrix:\n", cm)


                precision    recall  f1-score   support

      accident       0.92      0.89      0.91       200
 dense_traffic       0.93      0.93      0.93       200
          fire       0.98      0.96      0.97       200
sparse_traffic       0.85      0.91      0.87       200

      accuracy                           0.92       800
     macro avg       0.92      0.92      0.92       800
  weighted avg       0.92      0.92      0.92       800

Confusion matrix:
 [[178   3   1  18]
 [  3 185   0  12]
 [  5   0 192   3]
 [  7  10   2 181]]


In [22]:
def load_rgb_image_resized(path, size=IMG_SIZE):
    img_bgr = cv2.imread(path)
    if img_bgr is None:
        raise FileNotFoundError(path)
    img_rgb = img_bgr[:, :, ::-1].astype(np.float32) / 255.0
    img_rgb = cv2.resize(img_rgb, (size, size))
    return img_rgb

def prep_tensor(img_rgb):
    # preprocess_image returns batched tensor [1,3,H,W]
    return preprocess_image(img_rgb, mean=mean, std=std)  # returns torch tensor (cuda if available)


In [23]:
# sample one image per class from test set
samples = []
class_dirs = [os.path.join(test_folder, c) for c in test_ds.classes]
for cdir in class_dirs:
    files = [os.path.join(cdir, f) for f in os.listdir(cdir) if f.lower().endswith(('.jpg','.png','.jpeg'))]
    if files:
        samples.append(random.choice(files))

# add one misclassified or hard example (optional): choose first misclassified from earlier predictions
# Build map of test dataset index -> file path
test_files = []
for root, _, files in os.walk(test_folder):
    for f in files:
        if f.lower().endswith(('.jpg','.png','.jpeg')):
            test_files.append(os.path.join(root, f))

# If we have predictions from eval, choose a misclassified example
# (This is a simple heuristic: find where predicted != true)
if len(all_preds) == len(all_labels):
    # map test_loader order to file paths is complex; simpler: sample extra random file to reach 5 examples
    pass

# ensure at least 5 images (duplicate if necessary)
while len(samples) < 5:
    samples.append(random.choice(test_files))

print("Selected samples:")
for s in samples:
    print(s)


Selected samples:
trafficnet_dataset_v1/test/accident/images_025 (2).jpg
trafficnet_dataset_v1/test/dense_traffic/images_010.jpg
trafficnet_dataset_v1/test/fire/images_007 (2).jpg
trafficnet_dataset_v1/test/sparse_traffic/images_036 (3).jpg
trafficnet_dataset_v1/test/dense_traffic/images_101.jpg


In [None]:
from pytorch_grad_cam import GradCAM, GradCAMPlusPlus, ScoreCAM, HiResCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
import torch



# choose target layer (ResNet-style fallback)
import torch.nn as nn
if hasattr(model, 'layer4'):
    target_layer = model.layer4[-1]
else:
    # fallback: pick last Conv2d in model
    last_conv = None
    for m in model.modules():
        if isinstance(m, nn.Conv2d):
            last_conv = m
    if last_conv is None:
        raise RuntimeError("Could not find a Conv2d layer automatically; please pass the target layer manually.")
    target_layer = last_conv

print("Using target_layer:", target_layer)

# Corrected run function using target_layers=[...]
def run_cams_and_save_correct(img_path, model, target_layer, methods=['gradcam','gradcam++','scorecam'], save_dir=RESULT_DIR):
    img_rgb = load_rgb_image_resized(img_path)         # HxWx3 float 0..1
    input_tensor = prep_tensor(img_rgb).to(DEVICE)     # 1x3xHxW on DEVICE

    cams = {}
    # iterate methods and run each CAM in a try/except so one failing method doesn't stop the rest
    for m in methods:
        try:
            if m == 'gradcam':
                cam_obj = GradCAM(model=model, target_layers=[target_layer])
            elif m == 'gradcam++':
                cam_obj = GradCAMPlusPlus(model=model, target_layers=[target_layer])
            elif m == 'scorecam':
                cam_obj = ScoreCAM(model=model, target_layers=[target_layer])
            elif m == 'hirescam':
                cam_obj = HiResCAM(model=model, target_layers=[target_layer])

            else:
                raise ValueError("Unknown cam: " + str(m))

            # execute cam (returns [batch, H, W])
            grayscale = cam_obj(input_tensor=input_tensor, targets=None)[0]
            cam_img = show_cam_on_image(img_rgb, grayscale, use_rgb=True)
            cams[m] = cam_img

        except Exception as err:
            print(f"{m} error on {os.path.basename(img_path)}: {err}")
        finally:
            # try to delete cam object and free GPU memory to avoid destructor warnings/memory leaks
            try:
                del cam_obj
            except Exception:
                pass
            if DEVICE.type == 'cuda':
                torch.cuda.empty_cache()

    # Predict label + confidence for display
    with torch.no_grad():
        out = model(input_tensor)
        probs = torch.softmax(out, dim=1).cpu().numpy()[0]
        pred_idx = int(probs.argmax())
        pred_conf = float(probs.max())
        try:
            pred_label = test_ds.classes[pred_idx]
        except Exception:
            pred_label = f"idx={pred_idx}"

    # Plot original + cams side-by-side
    n = 1 + len(cams)
    plt.figure(figsize=(4*n, 4))
    plt.subplot(1, n, 1)
    plt.imshow(img_rgb)
    plt.title(f"Original\nPred: {pred_label} ({pred_conf:.2f})")
    plt.axis('off')

    i = 2
    for name, vis in cams.items():
        plt.subplot(1, n, i)
        plt.imshow(vis)
        plt.title(name)
        plt.axis('off')
        i += 1
    plt.show()

    # Save images
    base = os.path.splitext(os.path.basename(img_path))[0]
    for name, vis in cams.items():
        outpath = os.path.join(save_dir, f"{base}_{name}.png")
        vis_u8 = (vis).astype(np.uint8)
        # convert RGB->BGR for cv2.imwrite
        cv2.imwrite(outpath, cv2.cvtColor(vis_u8, cv2.COLOR_RGB2BGR))
    print("Saved cams for", os.path.basename(img_path))
    return pred_label, pred_conf

# Run on samples (use your existing samples list)
for s in samples:
    try:
        run_cams_and_save_correct(s, model, target_layer, methods=['gradcam','gradcam++','scorecam','hirescam'])
    except Exception as e:
        print("Error on", s, ":", e)

Using target_layer: Bottleneck(
  (conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (act1): ReLU(inplace=True)
  (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (drop_block): Identity()
  (act2): ReLU(inplace=True)
  (aa): Identity()
  (conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (act3): ReLU(inplace=True)
)


  9%|▊         | 11/128 [00:15<02:46,  1.43s/it]