# Convolutional Neural Network Cats vs. Dogs Classifier

Goal:
- Use the Oxford-IIIT Pet dataset (same dataset source often used in fastai examples)
- Convert breed labels into a binary task: cat vs dog
- Train a CNN with transfer learning (ResNet18)
- Inspect learning curves and discuss underfitting / overfitting
- Experiment with learning rates and regularization

In [None]:
import copy
import time
import random
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

import torchvision
from torchvision import transforms, models
from torchvision.datasets import OxfordIIITPet

from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

In [None]:
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

### 1) Dataset: Oxford-IIIT Pet (PyTorch / torchvision)

We use the same dataset source often seen in fastai tutorials, but load it in PyTorch.
The original task is breed classification.

In [None]:
DATA_ROOT = "./data"  # dataset will be downloaded here if not present

# We load the full set once without transforms (for splitting / label mapping)
base_ds = OxfordIIITPet(root=DATA_ROOT, split="trainval", target_types="category", download=True)

print("Total samples:", len(base_ds))

In [None]:
# %%
# Breed names in torchvision Oxford-IIIT Pet (37 classes)
# Convention in this dataset: Cat breeds start with uppercase letters, dog breeds start with lowercase letters
class_names = base_ds.classes
print("Number of breed classes:", len(class_names))
print("First 10 classes:", class_names[:10])

cat_breeds = [c for c in class_names if c[0].isupper()]
dog_breeds = [c for c in class_names if c[0].islower()]

print("Cat breeds:", len(cat_breeds))
print("Dog breeds:", len(dog_breeds))
print("Example cat breeds:", cat_breeds[:5])
print("Example dog breeds:", dog_breeds[:5])

### 2) Transforms (augmentation + normalization)

We use ImageNet normalization because ResNet18 pretrained weights expect it.


In [None]:
IMG_SIZE = 224
BATCH_SIZE = 32

train_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std =[0.229, 0.224, 0.225])
])

eval_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std =[0.229, 0.224, 0.225])
])

### 3) Create a binary cats-vs-dogs wrapper dataset

`OxfordIIITPet` returns breed category labels (0..36).
We map them to:
- 0 = cat
- 1 = dog

In [None]:
class OxfordPetCatsDogsBinary(Dataset):
    def __init__(self, root, split="trainval", transform=None, download=False):
        self.ds = OxfordIIITPet(
            root=root,
            split=split,
            target_types="category",
            transform=None,   # apply transform manually in __getitem__
            download=download
        )
        self.transform = transform
        self.classes_breeds = self.ds.classes  # breed names

    def __len__(self):
        return len(self.ds)

    def __getitem__(self, idx):
        image, breed_idx = self.ds[idx]  # breed_idx is int in [0, 36]

        breed_name = self.classes_breeds[breed_idx]
        # Oxford-IIIT Pet naming convention:
        # Cat breeds start with uppercase letters, dog breeds with lowercase letters
        binary_label = 0 if breed_name[0].isupper() else 1  # 0=cat, 1=dog

        if self.transform is not None:
            image = self.transform(image)

        return image, binary_label

### 4) Train/validation split and transform assignment

We split `trainval` into train + validation, then assign transforms separately.


In [None]:
# Build a dataset object and then split into train/val/test
full_trainval_ds = OxfordPetCatsDogsBinary(root=DATA_ROOT, split="trainval", transform=None, download=True)
test_ds_full = OxfordPetCatsDogsBinary(root=DATA_ROOT, split="test", transform=None, download=True)

print("Trainval samples:", len(full_trainval_ds))
print("Test samples:", len(test_ds_full))

n_total = len(full_trainval_ds)
n_train = int(0.8 * n_total)
n_val = n_total - n_train

train_subset, val_subset = random_split(
    full_trainval_ds,
    [n_train, n_val],
    generator=torch.Generator().manual_seed(SEED)
)

print("Train subset:", len(train_subset))
print("Val subset:", len(val_subset))

# %%
# Helper wrapper to apply different transforms to subsets created by random_split
class TransformSubset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        image, label = self.subset[idx]
        if self.transform is not None:
            image = self.transform(image)
        return image, label

# IMPORTANT:
# `full_trainval_ds` currently returns PIL images because transform=None
train_ds = TransformSubset(train_subset, transform=train_tfms)
val_ds = TransformSubset(val_subset, transform=eval_tfms)
test_ds = OxfordPetCatsDogsBinary(root=DATA_ROOT, split="test", transform=eval_tfms, download=True)

# %%
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

binary_class_names = ["cat", "dog"]

print("Train batches:", len(train_loader))
print("Val batches:", len(val_loader))
print("Test batches:", len(test_loader))

### 5) Visualize a few images (sanity check)

In [None]:
def denormalize(img_tensor):
    mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
    return (img_tensor * std + mean).clamp(0, 1)

images, labels = next(iter(train_loader))

plt.figure(figsize=(10, 6))
for i in range(min(8, len(images))):
    plt.subplot(2, 4, i + 1)
    img = denormalize(images[i]).permute(1, 2, 0).numpy()
    plt.imshow(img)
    plt.title(binary_class_names[labels[i].item()])
    plt.axis("off")
plt.tight_layout()
plt.show()

### 6) Model definition (separate class)

We use transfer learning with ResNet18 and replace the final layer for 2 classes.
Participants can easily swap the backbone or change dropout / freezing.


In [None]:
class ResNetBinaryClassifier(nn.Module):
    def __init__(self, backbone="resnet18", pretrained=True, dropout=0.0, freeze_backbone=False, num_classes=2):
        super().__init__()

        if backbone == "resnet18":
            weights = models.ResNet18_Weights.DEFAULT if pretrained else None
            self.backbone = models.resnet18(weights=weights)
            in_features = self.backbone.fc.in_features
        elif backbone == "resnet34":
            weights = models.ResNet34_Weights.DEFAULT if pretrained else None
            self.backbone = models.resnet34(weights=weights)
            in_features = self.backbone.fc.in_features
        elif backbone == "resnet50":
            weights = models.ResNet50_Weights.DEFAULT if pretrained else None
            self.backbone = models.resnet50(weights=weights)
            in_features = self.backbone.fc.in_features
        else:
            raise ValueError("Unsupported backbone. Choose from: resnet18, resnet34, resnet50")

        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False

        if dropout > 0:
            self.backbone.fc = nn.Sequential(
                nn.Dropout(dropout),
                nn.Linear(in_features, num_classes)
            )
        else:
            self.backbone.fc = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.backbone(x)

In [None]:
model = ResNetBinaryClassifier(
    backbone="resnet18",   # easy to explain and fast enough for workshop
    pretrained=True,
    dropout=0.2,
    freeze_backbone=False, # try True for feature-extractor mode
    num_classes=2
).to(device)

print(model)

### 7) Loss, optimizer, scheduler

- `CrossEntropyLoss` for 2-class classification
- Adam optimizer
- optional scheduler to reduce LR on plateaus

In [None]:
criterion = nn.CrossEntropyLoss()

LEARNING_RATE = 1e-4  # try 1e-5, 1e-4, 1e-3 for the learning-rate experiment
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode="min",
    factor=0.5,
    patience=2
)

print("Initial LR:", optimizer.param_groups[0]["lr"])

### 8) Training / validation functions

In [None]:
def run_epoch(model, loader, criterion, optimizer=None, device="cpu"):
    is_train = optimizer is not None
    model.train() if is_train else model.eval()

    running_loss = 0.0
    running_correct = 0
    n_samples = 0

    all_preds = []
    all_targets = []

    for xb, yb in loader:
        xb = xb.to(device)
        yb = yb.to(device)

        if is_train:
            optimizer.zero_grad()

        with torch.set_grad_enabled(is_train):
            logits = model(xb)
            loss = criterion(logits, yb)
            preds = torch.argmax(logits, dim=1)

            if is_train:
                loss.backward()
                optimizer.step()

        batch_size = xb.size(0)
        running_loss += loss.item() * batch_size
        running_correct += (preds == yb).sum().item()
        n_samples += batch_size

        all_preds.append(preds.detach().cpu())
        all_targets.append(yb.detach().cpu())

    epoch_loss = running_loss / n_samples
    epoch_acc = running_correct / n_samples
    all_preds = torch.cat(all_preds).numpy()
    all_targets = torch.cat(all_targets).numpy()

    return epoch_loss, epoch_acc, all_preds, all_targets

In [None]:
def train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler=None,
    device="cpu",
    num_epochs=10,
    early_stopping_patience=None
):
    history = {
        "train_loss": [],
        "train_acc": [],
        "val_loss": [],
        "val_acc": [],
        "lr": []
    }

    best_val_loss = float("inf")
    best_state = copy.deepcopy(model.state_dict())
    epochs_without_improvement = 0

    start_time = time.time()

    for epoch in range(1, num_epochs + 1):
        train_loss, train_acc, _, _ = run_epoch(model, train_loader, criterion, optimizer=optimizer, device=device)
        val_loss, val_acc, _, _ = run_epoch(model, val_loader, criterion, optimizer=None, device=device)

        if scheduler is not None:
            scheduler.step(val_loss)

        current_lr = optimizer.param_groups[0]["lr"]

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)
        history["lr"].append(current_lr)

        print(
            f"Epoch {epoch:02d}/{num_epochs} | "
            f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f} | "
            f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f} | "
            f"lr={current_lr:.2e}"
        )

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = copy.deepcopy(model.state_dict())
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1

        if early_stopping_patience is not None and epochs_without_improvement >= early_stopping_patience:
            print(f"Early stopping triggered after {epoch} epochs.")
            break

    elapsed = time.time() - start_time
    print(f"Training finished in {elapsed/60:.1f} min")

    model.load_state_dict(best_state)
    return model, history

### 9) Training loop

In [None]:
num_epochs = 8  # try 3, 8, 15, 25 to show under/overfitting behavior

model, history = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=device,
    num_epochs=num_epochs,
    early_stopping_patience=5
)

### 10) Plot training curves

In [None]:
epochs = np.arange(1, len(history["train_loss"]) + 1)

plt.figure(figsize=(7, 4))
plt.plot(epochs, history["train_loss"], label="Train loss")
plt.plot(epochs, history["val_loss"], label="Validation loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss curves")
plt.legend()
plt.show()

plt.figure(figsize=(7, 4))
plt.plot(epochs, history["train_acc"], label="Train accuracy")
plt.plot(epochs, history["val_acc"], label="Validation accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Accuracy curves")
plt.legend()
plt.show()

plt.figure(figsize=(7, 4))
plt.plot(epochs, history["lr"], label="Learning rate")
plt.xlabel("Epoch")
plt.ylabel("LR")
plt.title("Learning rate during training")
plt.legend()
plt.show()

### 10) Final evaluation on test set

In [None]:
val_loss, val_acc, val_preds, val_targets = run_epoch(
    model, val_loader, criterion, optimizer=None, device=device
)

print(f"Validation loss: {val_loss:.4f}")
print(f"Validation accuracy: {val_acc:.4f}")
print()
print(classification_report(val_targets, val_preds, target_names=binary_class_names))

cm = confusion_matrix(val_targets, val_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=binary_class_names)
disp.plot()
plt.show()

test_loss, test_acc, test_preds, test_targets = run_epoch(
    model, test_loader, criterion, optimizer=None, device=device
)

print(f"Test loss: {test_loss:.4f}")
print(f"Test accuracy: {test_acc:.4f}")
print()
print(classification_report(test_targets, test_preds, target_names=binary_class_names))


### 13) Visualize predictions

In [None]:
def predict_batch(model, loader, class_names, device="cpu", n_show=8):
    model.eval()
    xb, yb = next(iter(loader))
    xb = xb.to(device)

    with torch.no_grad():
        logits = model(xb)
        preds = torch.argmax(logits, dim=1).cpu()

    plt.figure(figsize=(10, 6))
    for i in range(min(n_show, len(xb))):
        plt.subplot(2, 4, i + 1)
        img = denormalize(xb[i].cpu()).permute(1, 2, 0).numpy()
        true_label = class_names[yb[i].item()]
        pred_label = class_names[preds[i].item()]
        plt.imshow(img)
        plt.title(f"T: {true_label}\nP: {pred_label}")
        plt.axis("off")
    plt.tight_layout()
    plt.show()

predict_batch(model, val_loader, binary_class_names, device=device)

### (Optional) Feature extraction from the penultimate layer

We extract embeddings from the layer before the final classifier (fc) and visualize them in 2D using PCA.

In [None]:
from sklearn.decomposition import PCA

@torch.no_grad()
def extract_resnet_features(model, loader, device="cpu", max_samples=500):
    """
    Extract penultimate-layer features from a ResNet model.
    Returns:
        features: np.ndarray of shape (N, D)
        labels: np.ndarray of shape (N,)
    """
    model.eval()

    # We use the backbone up to (but excluding) the final fc layer.
    # For ResNet: conv -> ... -> avgpool -> flatten -> fc
    feature_extractor = nn.Sequential(*list(model.backbone.children())[:-1]).to(device)
    feature_extractor.eval()

    all_features = []
    all_labels = []
    n_collected = 0

    for xb, yb in loader:
        xb = xb.to(device)

        feats = feature_extractor(xb)          # shape: (B, 512, 1, 1) for ResNet18
        feats = torch.flatten(feats, 1)        # shape: (B, 512)

        all_features.append(feats.cpu())
        all_labels.append(yb.cpu())

        n_collected += xb.size(0)
        if n_collected >= max_samples:
            break

    features = torch.cat(all_features, dim=0)[:max_samples].numpy()
    labels = torch.cat(all_labels, dim=0)[:max_samples].numpy()

    return features, labels

features, feat_labels = extract_resnet_features(model, val_loader, device=device, max_samples=400)

print("Feature matrix shape:", features.shape)
print("Labels shape:", feat_labels.shape)

In [None]:
# %%
pca = PCA(n_components=2, random_state=42)
features_2d = pca.fit_transform(features)

plt.figure(figsize=(7, 5))
for class_idx, class_name in enumerate(binary_class_names):
    mask = feat_labels == class_idx
    plt.scatter(
        features_2d[mask, 0],
        features_2d[mask, 1],
        label=class_name,
        alpha=0.7,
        s=20
    )

plt.xlabel("PC1")
plt.ylabel("PC2")
plt.title("PCA of CNN features (penultimate layer)")
plt.legend()
plt.show()

print("Explained variance ratio (PC1 + PC2):", pca.explained_variance_ratio_.sum().round(3))