# Chapter 5: Modeling Objectives


In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, Subset
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


In [2]:
# define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# Hold out a few examples from each class from the training set
num_holdout_per_class = 50
holdout_indices = []
train_indices = []
class_counts = {i: 0 for i in range(10)}

for idx, (img, label) in enumerate(train_dataset):
    if class_counts[label] < num_holdout_per_class:
        holdout_indices.append(idx)
        class_counts[label] += 1
    else:
        train_indices.append(idx)

holdout_set = Subset(train_dataset, holdout_indices)
train_set = Subset(train_dataset, train_indices)

"""
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
holdout_loader = DataLoader(holdout_set, batch_size=num_holdout_per_class*10, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
"""


'\ntrain_loader = DataLoader(train_set, batch_size=64, shuffle=True)\nholdout_loader = DataLoader(holdout_set, batch_size=num_holdout_per_class*10, shuffle=False)\ntest_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)\n'

# Contrastive Loss

In [4]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = nn.functional.pairwise_distance(output1, output2)
        loss = torch.mean((1-label) * torch.pow(euclidean_distance, 2) + (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss




# Contrastive CNN Model

In [5]:
class ContrastiveCNN(nn.Module):
    def __init__(self):
        super(ContrastiveCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=5)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5)
        self.fc1 = nn.Linear(1024, 256)
        self.fc2 = nn.Linear(256, 64)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2(x), 2))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


# Train the Model

In [6]:
import random

class ContrastiveMNISTDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
        self.labels = [label for _, label in dataset]
        self.label_to_indices = {label: np.where(np.array(self.labels) == label)[0] for label in set(self.labels)}

    def __getitem__(self, index):
        img1, label1 = self.dataset[index]
        if random.randint(0, 1):
            # Positive pair
            label2 = label1
            img2_index = index
            while img2_index == index:
                img2_index = random.choice(self.label_to_indices[label1])
        else:
            # Negative pair
            label2 = random.choice(list(set(self.labels) - {label1}))
            img2_index = random.choice(self.label_to_indices[label2])

        img2, _ = self.dataset[img2_index]

        label = torch.tensor(int(label1 == label2), dtype=torch.float32)
        return img1, img2, label

    def __len__(self):
        return len(self.dataset)

contrastive_train_dataset = ContrastiveMNISTDataset(train_set)

train_loader = DataLoader(contrastive_train_dataset, batch_size=64, shuffle=True)
holdout_loader = DataLoader(holdout_set, batch_size=num_holdout_per_class * 10, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [7]:
import torch.optim as optim
import torch.nn as nn
from tqdm import tqdm
import torch.optim.lr_scheduler as lr_scheduler

model = ContrastiveCNN().to(device)
criterion = ContrastiveLoss().to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)


num_epochs = 10
model.train()

for epoch in tqdm(range(num_epochs)):
    running_loss = 0.0
    for batch_idx, (img1, img2, label) in enumerate(train_loader):
        img1, img2, label = img1.to(device), img2.to(device), label.to(device)

        optimizer.zero_grad()
        output1 = model(img1)
        output2 = model(img2)
        loss = criterion(output1, output2, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    scheduler.step()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")


 10%|█         | 1/10 [00:56<08:31, 56.85s/it]

Epoch [1/10], Loss: 0.2693414052007019


 20%|██        | 2/10 [01:51<07:24, 55.51s/it]

Epoch [2/10], Loss: 0.2652581853731986


 30%|███       | 3/10 [02:46<06:27, 55.38s/it]

Epoch [3/10], Loss: 0.25896600672314246


 40%|████      | 4/10 [03:41<05:30, 55.06s/it]

Epoch [4/10], Loss: 0.2574675916984517


 50%|█████     | 5/10 [04:36<04:36, 55.22s/it]

Epoch [5/10], Loss: 0.25777752104625906


 60%|██████    | 6/10 [05:30<03:39, 54.83s/it]

Epoch [6/10], Loss: 0.25715777335628387


 70%|███████   | 7/10 [06:25<02:44, 54.90s/it]

Epoch [7/10], Loss: 0.25687490927596246


 80%|████████  | 8/10 [07:19<01:49, 54.55s/it]

Epoch [8/10], Loss: 0.257037169991001


 90%|█████████ | 9/10 [08:13<00:54, 54.27s/it]

Epoch [9/10], Loss: 0.2571556911032687


100%|██████████| 10/10 [09:07<00:00, 54.77s/it]

Epoch [10/10], Loss: 0.2575170864501307





# Evaluate Model

In [8]:
from sklearn.metrics.pairwise import cosine_similarity

model.eval()
holdout_embeddings = []
holdout_labels = []

#test
with torch.no_grad():
    for images, labels in holdout_loader:
        images, labels = images.to(device), labels.to(device)

        embeddings = model(images)
        holdout_embeddings.append(embeddings)
        holdout_labels.append(labels)

holdout_embeddings = torch.cat(holdout_embeddings)
holdout_labels = torch.cat(holdout_labels)

correct_per_class = [0] * 10
total_per_class = [0] * 10
correct_total = 0
total_total = 0


with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        test_embeddings = model(images)
        similarities = cosine_similarity(test_embeddings.cpu(), holdout_embeddings.cpu())

        # Compute average similarity for each class
        avg_similarities = np.zeros((test_embeddings.size(0), 10))
        for c in range(10):
            class_indices = (holdout_labels == c).nonzero(as_tuple=True)[0]
            class_similarities = similarities[:, class_indices.cpu()]
            avg_similarities[:, c] = class_similarities.mean(axis=1)


        predicted_labels = avg_similarities.argmax(axis=1)

        for i, label in enumerate(labels):
            if predicted_labels[i] == label:
                correct_per_class[label] += 1
                correct_total += 1
            total_per_class[label] += 1
            total_total += 1

accuracy_per_class = [correct_per_class[i] / total_per_class[i] if total_per_class[i] > 0 else 0 for i in range(10)]

best_class = np.argmax(accuracy_per_class)
best_accuracy = accuracy_per_class[best_class]

overall_accuracy = correct_total / total_total

for i in range(10):
    print(f"Class {i} Accuracy: {accuracy_per_class[i] * 100:.2f}%")

print(f"Best Class: {best_class} with Accuracy: {best_accuracy * 100:.2f}%")
print(f"Overall Accuracy: {overall_accuracy * 100:.2f}%")


Class 0 Accuracy: 32.86%
Class 1 Accuracy: 14.71%
Class 2 Accuracy: 2.71%
Class 3 Accuracy: 5.84%
Class 4 Accuracy: 9.78%
Class 5 Accuracy: 13.34%
Class 6 Accuracy: 30.38%
Class 7 Accuracy: 8.56%
Class 8 Accuracy: 5.85%
Class 9 Accuracy: 4.36%
Best Class: 0 with Accuracy: 32.86%
Overall Accuracy: 12.71%
