In [4]:
import cv2
import pandas as pd

# ---------------------------------------------------
# Read train/test image ID split
# ---------------------------------------------------
train_images_id = []
test_images_id = []

with open("/kaggle/input/cub2002011/CUB_200_2011/train_test_split.txt", "r") as f:
    for line in f:
        img_id, flag = line.strip().split()
        if flag == "1":
            train_images_id.append(img_id)
        else:
            test_images_id.append(img_id)

# ---------------------------------------------------
# Read image paths
# ---------------------------------------------------
images_path = {}

with open("/kaggle/input/cub2002011/CUB_200_2011/images.txt", "r") as f:
    for line in f:
        img_id, rel_path = line.strip().split()
        images_path[img_id] = rel_path

# ---------------------------------------------------
# Read image class labels
# ---------------------------------------------------
images_label = {}

with open("/kaggle/input/cub2002011/CUB_200_2011/image_class_labels.txt", "r") as f:
    for line in f:
        img_id, cls_id = line.strip().split()
        images_label[img_id] = int(cls_id)

# ---------------------------------------------------
# Load TRAIN images
# ---------------------------------------------------
train_images = []
train_classes = []

for img_id in train_images_id:
    path = "/kaggle/input/cub2002011/CUB_200_2011/images/" + images_path[img_id]
    img = cv2.imread(path)

    if img is None:
        continue

    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)   # convert BGR → RGB

    train_images.append(img)
    train_classes.append(images_label[img_id])

# ---------------------------------------------------
# Load TEST images
# ---------------------------------------------------
test_images = []
test_classes = []

for img_id in test_images_id:
    path = "/kaggle/input/cub2002011/CUB_200_2011/images/" + images_path[img_id]
    img = cv2.imread(path)

    if img is None:
        continue

    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    test_images.append(img)
    test_classes.append(images_label[img_id])

# ---------------------------------------------------
# Build DataFrames
# ---------------------------------------------------
train_df = pd.DataFrame({
    "image": train_images,
    "class": train_classes
})

test_df = pd.DataFrame({
    "image": test_images,
    "class": test_classes
})

print("Train samples:", len(train_df))
print("Test samples:", len(test_df))

In [15]:
import cv2
import matplotlib.pyplot as plt

plt.imshow(cv2.cvtColor(train_df.iloc[3]["image"], cv2.COLOR_BGR2RGB))
plt.axis('off')

error: OpenCV(4.12.0) :-1: error: (-5:Bad argument) in function 'cvtColor'
> Overload resolution failed:
>  - src is not a numpy array, neither a scalar
>  - Expected Ptr<cv::UMat> for argument 'src'


In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# -------------------------------------------------------
# Simple Encoder + Decoder
# -------------------------------------------------------
class SimpleMetricAutoEncoder(nn.Module):
    def __init__(self, embed_dim=512):
        super().__init__()

        goog = models.googlenet(weights="IMAGENET1K_V1")

        self.backbone = nn.Sequential(
            goog.conv1, goog.maxpool1,
            goog.conv2, goog.conv3, goog.maxpool2,
            goog.inception3a, goog.inception3b, goog.maxpool3,
            goog.inception4a, goog.inception4b, goog.inception4c,
            goog.inception4d, goog.inception4e,
            goog.maxpool4,
            goog.inception5a, goog.inception5b,
            goog.avgpool
        )

        self.backbone_out = 1024
        self.fc_embed = nn.Linear(1024, embed_dim)

        self.decoder = nn.Sequential(
            nn.Linear(embed_dim, 512),
            nn.Tanh(),
            nn.Linear(512, 1024)
        )

    def extract_f(self, x):
        f = self.backbone(x)
        return f.view(f.size(0), -1)

    def forward(self, x):
        f = self.extract_f(x)
        z = self.fc_embed(f)
        f_hat = self.decoder(z)
        return z, f, f_hat


# -------------------------------------------------------
# N-Pair Loss
# -------------------------------------------------------
class NPairLoss(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, anchors, positives):
        anchors = F.normalize(anchors, dim=1)
        positives = F.normalize(positives, dim=1)

        sim = anchors @ positives.t()
        labels = torch.arange(sim.size(0)).to(sim.device)

        return F.cross_entropy(sim, labels)


def recon_loss(x, x_hat):
    return F.mse_loss(x_hat, x)


# -------------------------------------------------------
# Setup model for Kaggle T4x2 GPUs
# -------------------------------------------------------
model = SimpleMetricAutoEncoder(embed_dim=512).to(device)

if torch.cuda.device_count() > 1:
    print("Using", torch.cuda.device_count(), "GPUs")
    model = nn.DataParallel(model)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
npair = NPairLoss()


# -------------------------------------------------------
# Training Loop
# -------------------------------------------------------
for epoch in range(10):
    for imgs, labels in train_loader:

        imgs = imgs.to(device)
        labels = labels.to(device)

        z, f, f_hat = model(imgs)

        # Build anchor-positive pairs
        anchors, positives = [], []
        for c in labels.unique():
            idx = (labels == c).nonzero().flatten()
            if len(idx) < 2:
                continue
            anchors.append(z[idx[0]])
            positives.append(z[idx[1]])

        if len(anchors) < 2:
            continue

        anchors = torch.stack(anchors)
        positives = torch.stack(positives)

        metric = npair(anchors, positives)
        rec = recon_loss(f, f_hat)
        loss = metric + rec

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print("Epoch", epoch, "Loss:", float(loss))


# -------------------------------------------------------
# Extract Features
# -------------------------------------------------------
def extract_embeddings(model, loader):
    model.eval()
    E, L = [], []
    with torch.no_grad():
        for imgs, lbls in loader:
            imgs = imgs.to(device)
            z, _, _ = model(imgs)
            E.append(z.cpu())
            L.append(lbls)
    return torch.cat(E), torch.cat(L)


# -------------------------------------------------------
# Recall@K
# -------------------------------------------------------
def recall_at_k(E, L, K=1):
    E = F.normalize(E, dim=1)
    sim = E @ E.t()
    N = len(L)
    sim[range(N), range(N)] = -1
    _, idx = sim.topk(K, dim=1)
    correct = sum(L[i] in L[idx[i]] for i in range(N))
    return correct / N


test_emb, test_lab = extract_embeddings(model, test_loader)

for k in [1,2,4,8]:
    print("R@", k, "=", recall_at_k(test_emb, test_lab, k))

Downloading: "https://download.pytorch.org/models/googlenet-1378be20.pth" to /root/.cache/torch/hub/checkpoints/googlenet-1378be20.pth
100%|██████████| 49.7M/49.7M [00:00<00:00, 151MB/s]


RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx