# Imports & Environment Setup

In [2]:
import os, random, math, numpy as np, pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

from sklearn.metrics import classification_report, confusion_matrix

# Set Random Seeds

In [3]:
def seed_all(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

seed_all(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

# Loading Dataset - Dataset Paths (Train / Valid / Test)

In [None]:
DATA_ROOT = "/kaggle/input/70-dog-breedsimage-data-set"

train_dir = os.path.join(DATA_ROOT, "train")
val_dir   = os.path.join(DATA_ROOT, "valid")
test_dir  = os.path.join(DATA_ROOT, "test")

print("Train exists:", os.path.isdir(train_dir))
print("Valid exists:", os.path.isdir(val_dir))
print("Test exists :", os.path.isdir(test_dir))


# Number of Classes

In [None]:
def count_images(root):
    classes = sorted([d for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))])
    counts = []
    for c in classes:
        cpath = os.path.join(root, c)
        n = len([f for f in os.listdir(cpath) if f.lower().endswith((".jpg",".jpeg",".png"))])
        counts.append((c, n))
    df = pd.DataFrame(counts, columns=["class", "count"]).sort_values("count", ascending=False)
    return df, classes

train_df, classes = count_images(train_dir)
val_df, _ = count_images(val_dir)
test_df, _ = count_images(test_dir)

print("Num classes:", len(classes))
print("Train images:", train_df["count"].sum())
print("Val images  :", val_df["count"].sum())
print("Test images :", test_df["count"].sum())

train_df.head()


# Class Distribution: Count Images per Breed

In [None]:
plt.figure(figsize=(12,4))
plt.hist(train_df["count"], bins=20)
plt.title("Train: Distribution of images per class")
plt.xlabel("Images per class")
plt.ylabel("Number of classes")
plt.show()

train_df.describe()


# Visualize Random Training Samples

In [None]:
def show_samples(root, classes, n=12):
    plt.figure(figsize=(12,8))
    for i in range(n):
        c = random.choice(classes)
        cdir = os.path.join(root, c)
        img_name = random.choice(os.listdir(cdir))
        img_path = os.path.join(cdir, img_name)
        img = Image.open(img_path).convert("RGB")
        plt.subplot(3,4,i+1)
        plt.imshow(img)
        plt.title(c, fontsize=9)
        plt.axis("off")
    plt.tight_layout()
    plt.show()

show_samples(train_dir, classes, n=12)


# Check Image Size Variations

In [None]:
from collections import Counter

def sample_sizes(root, classes, k=300):
    sizes = []
    for _ in range(k):
        c = random.choice(classes)
        cdir = os.path.join(root, c)
        img_name = random.choice(os.listdir(cdir))
        img_path = os.path.join(cdir, img_name)
        img = Image.open(img_path).convert("RGB")
        sizes.append(img.size)
    return Counter(sizes).most_common(10)

sample_sizes(train_dir, classes, k=300)

# Compute Dataset Mean & Std for Normalization

In [None]:
def compute_mean_std(dataset, num_batches=50, batch_size=64):
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
    mean = 0.
    var  = 0.
    n = 0
    for i, (x, _) in enumerate(loader):
        if i >= num_batches: break
        x = x.to(device)
        b = x.size(0)
        x = x.view(b, x.size(1), -1)
        mean += x.mean(dim=2).sum(dim=0)
        var  += x.var(dim=2, unbiased=False).sum(dim=0)
        n += b
    mean /= n
    var  /= n
    std = torch.sqrt(var)
    return mean.detach().cpu().numpy(), std.detach().cpu().numpy()

In [None]:
stats_tf = transforms.Compose([transforms.ToTensor()])
tmp_ds = datasets.ImageFolder(train_dir, transform=stats_tf)
mean, std = compute_mean_std(tmp_ds)
mean, std

# Data Augmentation & Preprocessing Pipelines

In [None]:
IMG_SIZE = 256
BATCH_SIZE = 64

train_tf = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.7, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.RandomAffine(0, translate=(0.05, 0.05)),
    transforms.ColorJitter(0.2,0.2,0.2,0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
    transforms.RandomErasing(p=0.25, scale=(0.02, 0.12), ratio=(0.3, 3.3)),
])

eval_tf = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.CenterCrop(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])

from torchvision.datasets import ImageFolder
import re

def canon(s: str) -> str:
    return re.sub(r"\s+", " ", s.strip()).lower()

class ImageFolderWithCanonicalMap(ImageFolder):
    def __init__(self, root, train_class_to_idx, transform=None):
        super().__init__(root=root, transform=transform)
        train_c2i = {canon(k): v for k, v in train_class_to_idx.items()}
        fixed_samples = []
        missing = set()
        for path, local_target in self.samples:
            class_name = self.classes[local_target]
            key = canon(class_name)
            if key not in train_c2i:
                missing.add(class_name)
                continue
            fixed_samples.append((path, train_c2i[key]))
        if missing:
            print("WARNING: unmatched classes:", list(sorted(missing))[:10], "...")
        self.samples = fixed_samples
        self.targets = [t for _, t in self.samples]

train_ds = ImageFolder(train_dir, transform=train_tf)
val_ds   = ImageFolderWithCanonicalMap(val_dir,  train_ds.class_to_idx, transform=eval_tf)
test_ds  = ImageFolderWithCanonicalMap(test_dir, train_ds.class_to_idx, transform=eval_tf)

num_classes = len(train_ds.classes)
print("Classes:", num_classes)
print("Val samples:", len(val_ds))
print("Test samples:", len(test_ds))

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

# Class Weights Compute

In [None]:
from collections import Counter

counts = Counter(train_ds.targets)

class_weights = torch.zeros(num_classes)
for i in range(num_classes):
    class_weights[i] = counts[i]

# inverse frequency weighting
class_weights = class_weights.sum() / class_weights
class_weights = class_weights / class_weights.mean()

class_weights = class_weights.to(device)
print("Class weights computed")

In [None]:
class ReconWrapper(torch.utils.data.Dataset):
    def __init__(self, base_ds):
        self.base = base_ds
    def __len__(self):
        return len(self.base)
    def __getitem__(self, idx):
        x, _ = self.base[idx]
        return x, x  # input, target = same image

train_recon_loader = DataLoader(ReconWrapper(train_ds), batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_recon_loader   = DataLoader(ReconWrapper(val_ds),   batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

# Convolutional Autoencoder (CAE) Architecture

In [None]:
class CAE(nn.Module):
    def __init__(self, latent_dim=256, img_size=224):
        super().__init__()

        self.enc = nn.Sequential(
            nn.Conv2d(3, 32, 4, 2, 1), nn.BatchNorm2d(32), nn.ReLU(),     # /2
            nn.Conv2d(32, 64, 4, 2, 1), nn.BatchNorm2d(64), nn.ReLU(),    # /4
            nn.Conv2d(64, 128, 4, 2, 1), nn.BatchNorm2d(128), nn.ReLU(),  # /8
            nn.Conv2d(128, 256, 4, 2, 1), nn.BatchNorm2d(256), nn.ReLU(), # /16
            nn.Conv2d(256, 256, 4, 2, 1), nn.BatchNorm2d(256), nn.ReLU(), # /32
        )

        # compute encoder output shape dynamically
        with torch.no_grad():
            dummy = torch.zeros(1, 3, img_size, img_size)
            h = self.enc(dummy)
            self.enc_shape = h.shape[1:]                # (C,H,W)
            self.flat_dim = h.numel()                   # C*H*W

        self.to_latent = nn.Linear(self.flat_dim, latent_dim)
        self.from_latent = nn.Linear(latent_dim, self.flat_dim)

        self.dec = nn.Sequential(
            nn.ConvTranspose2d(256, 256, 4, 2, 1), nn.BatchNorm2d(256), nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 4, 2, 1), nn.BatchNorm2d(128), nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, 2, 1), nn.BatchNorm2d(64), nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, 2, 1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, 2, 1),
        )

    def encode(self, x):
        x = self.enc(x)
        x = x.flatten(1)
        return self.to_latent(x)

    def decode(self, z):
        x = self.from_latent(z).view(-1, *self.enc_shape)
        return self.dec(x)

    def forward(self, x):
        z = self.encode(x)
        return self.decode(z)

# CAE Evaluation Function (Reconstruction Loss)

In [None]:
@torch.no_grad()
def eval_recon(model, loader):
    model.eval()
    total_loss, n = 0.0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        x_hat = model(x)
        loss = F.mse_loss(x_hat, y)
        total_loss += loss.item() * x.size(0)
        n += x.size(0)
    return total_loss / n

LATENT_DIM = 512
cae = CAE(latent_dim=LATENT_DIM, img_size=IMG_SIZE).to(device)

opt_ae = torch.optim.AdamW(cae.parameters(), lr=3e-4, weight_decay=1e-4)


AE_EPOCHS = 40
best_val_recon = 1e9

for ep in range(1, AE_EPOCHS+1):
    cae.train()
    total_loss, n = 0.0, 0
    for x, y in train_recon_loader:
        x, y = x.to(device), y.to(device)
        opt_ae.zero_grad(set_to_none=True)
        x_hat = cae(x)
        loss = F.mse_loss(x_hat, y)
        loss.backward()
        opt_ae.step()
        total_loss += loss.item() * x.size(0)
        n += x.size(0)

    train_loss = total_loss / n
    val_loss = eval_recon(cae, val_recon_loader)
    print(f"[CAE] Epoch {ep:02d} | Train Recon Loss: {train_loss:.5f} | Val Recon Loss: {val_loss:.5f}")

    if val_loss < best_val_recon:
        best_val_recon = val_loss
        torch.save(cae.state_dict(), "best_cae.pth")

print("Best Val Recon Loss:", best_val_recon)


In [None]:
cae.load_state_dict(torch.load("best_cae.pth", map_location=device))
cae.eval()

x, _ = next(iter(val_recon_loader))
x = x.to(device)[:8]
with torch.no_grad():
    x_hat = cae(x)

# unnormalize for viewing
def unnorm(t):
    m = torch.tensor(mean, device=t.device).view(1,3,1,1)
    s = torch.tensor(std, device=t.device).view(1,3,1,1)
    return (t*s + m).clamp(0,1)

x_vis = unnorm(x).cpu()
xh_vis = unnorm(x_hat).cpu().clamp(0,1)

plt.figure(figsize=(12,4))
for i in range(8):
    plt.subplot(2,8,i+1)
    plt.imshow(x_vis[i].permute(1,2,0))
    plt.axis("off")
    plt.subplot(2,8,8+i+1)
    plt.imshow(xh_vis[i].permute(1,2,0))
    plt.axis("off")
plt.suptitle("Top: Original | Bottom: Reconstructed (CAE)")
plt.show()