<a href="https://colab.research.google.com/github/Dd1235/LearnAI/blob/main/mini_projects/Siamese_Network_OmniglotSiameseNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/Dd1235/LearnAI.git
%cd LearnAI/mini_projects/Siamese_Network_Omniglot

Cloning into 'LearnAI'...
remote: Enumerating objects: 34242, done.[K
remote: Counting objects: 100% (34242/34242), done.[K
remote: Compressing objects: 100% (34224/34224), done.[K
remote: Total 34242 (delta 24), reused 34225 (delta 10), pack-reused 0 (from 0)[K
Receiving objects: 100% (34242/34242), 34.50 MiB | 14.40 MiB/s, done.
Resolving deltas: 100% (24/24), done.
Updating files: 100% (32489/32489), done.
/content/LearnAI/mini_projects/Siamese_Network_Omniglot


In [2]:
!ls
!ls "Omniglot Dataset"

'Omniglot Dataset'   oneshot1.pdf   README.md
images_background  images_evaluation


In [3]:
# it is bad practice to put entire 133MB of dataset in a folder like done here, will change later
# this is simplified to make the training faster for demonstration purposes

In [4]:
import os
import random
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


data_path = "Omniglot Dataset"
background_path = os.path.join(data_path, 'images_background')
evaluation_path = os.path.join(data_path, 'images_evaluation')

print("Background path:", background_path)
print("Evaluation path:", evaluation_path)

Using device: cuda
Background path: Omniglot Dataset/images_background
Evaluation path: Omniglot Dataset/images_evaluation


In [5]:
class OmniglotDataset(Dataset):
    """
    Loads pairs of images from the Omniglot background set for training.
    Each item is (img1, img2, label) where label=1 if same class, else 0.
    """
    def __init__(self, root, transform=None, num_pairs=30000):
        """
        root: path to the background images folder
        transform: optional transform (e.g. ToTensor, etc.)
        num_pairs: how many pairs to generate for the dataset
        """
        super().__init__()
        self.root = root
        self.transform = transform
        self.all_images = []  # (image_path, character_id)
        current_label = 0

        alphabets = os.listdir(root)
        for alpha in alphabets:
            alpha_path = os.path.join(root, alpha)
            if not os.path.isdir(alpha_path):
                continue
            chars = os.listdir(alpha_path)
            for char in chars:
                char_path = os.path.join(alpha_path, char)
                if not os.path.isdir(char_path):
                    continue
                images_in_char = os.listdir(char_path)
                for imgname in images_in_char:
                    if imgname.endswith(".png"):
                        full_path = os.path.join(char_path, imgname)
                        self.all_images.append((full_path, current_label))
                current_label += 1

        self.num_chars = current_label  # total unique characters
        # Group images by label for easy same-class sampling
        from collections import defaultdict
        self.images_by_label = defaultdict(list)
        for img_path, lbl in self.all_images:
            self.images_by_label[lbl].append(img_path)

        self.all_labels = np.arange(self.num_chars)
        self.num_pairs = num_pairs

    def __len__(self):
        return self.num_pairs

    def __getitem__(self, idx):
        # half the time same class, half the time different
        same_class = np.random.choice([0,1])

        if same_class == 1:
            # pick random label
            label = np.random.choice(self.all_labels)
            # pick two images from that label
            imgs = random.sample(self.images_by_label[label], 2)
            y = 1
        else:
            # pick two different labels
            lbl1, lbl2 = np.random.choice(self.all_labels, 2, replace=False)
            img1 = random.choice(self.images_by_label[lbl1])
            img2 = random.choice(self.images_by_label[lbl2])
            imgs = [img1, img2]
            y = 0

        img1 = Image.open(imgs[0]).convert('L')
        img2 = Image.open(imgs[1]).convert('L')

        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return (img1, img2, torch.tensor(y, dtype=torch.float32))


In [14]:
def load_images_by_character(root):
    """
    Returns a dict: { (alphabet, character): [list of image paths] }
    """
    from collections import defaultdict
    data_dict = defaultdict(list)
    alphabets = os.listdir(root)
    for alpha in alphabets:
        alpha_path = os.path.join(root, alpha)
        if not os.path.isdir(alpha_path):
            continue
        chars = os.listdir(alpha_path)
        for char in chars:
            char_path = os.path.join(alpha_path, char)
            if not os.path.isdir(char_path):
                continue
            images_in_char = os.listdir(char_path)
            for imgname in images_in_char:
                if imgname.endswith('.png'):
                    full_path = os.path.join(char_path, imgname)
                    data_dict[(alpha, char)].append(full_path)
    return data_dict

class OneShotEvaluation:
    def __init__(self, eval_root, transform=None, n_way=20):
        """
        Prepare data for n-way one-shot tasks from the evaluation set.
        """
        self.eval_root = eval_root
        self.transform = transform
        self.n_way = n_way
        self.data_dict = load_images_by_character(eval_root)
        # We can convert the dict keys into a list for easier random sampling
        self.all_keys = list(self.data_dict.keys())

    def get_one_shot_batch(self, batch_size=1):
        """
        Generate a batch of n-way one-shot tasks.
        Returns a list of tuples: (test_image, support_images, correct_index)
        """
        tasks = []
        for _ in range(batch_size):
            sampled_keys = random.sample(self.all_keys, self.n_way)
            correct_class = 0
            images_for_char0 = self.data_dict[sampled_keys[0]]
            # pick test image:
            test_img_path = random.choice(images_for_char0)

            # pick a support image for each of the n_way classes
            support_image_paths = []
            for k in sampled_keys:
                char_imgs = self.data_dict[k]
                sup_img_path = random.choice(char_imgs)
                support_image_paths.append(sup_img_path)

            # load them into memory and transform
            test_img = Image.open(test_img_path).convert('L')
            if self.transform:
                test_img = self.transform(test_img)

            support_imgs = []
            for sp in support_image_paths:
                sp_img = Image.open(sp).convert('L')
                if self.transform:
                    sp_img = self.transform(sp_img)
                support_imgs.append(sp_img)

            tasks.append((test_img, support_imgs, correct_class))

        return tasks

In [9]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        # Dummy input to compute flatten size
        dummy = torch.zeros(1, 1, 105, 105)
        dummy_out = self.features(dummy)
        flatten_dim = dummy_out.view(1, -1).size(1)

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(flatten_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128)
        )

    def forward_once(self, x):
        x = self.features(x)
        x = self.fc(x)
        return x

    def forward(self, x1, x2):
        out1 = self.forward_once(x1)
        out2 = self.forward_once(x2)
        return out1, out2



Using a contrasitive margin based loss like in a lot of tutorials in Siamese nn instead of Cross Entropy with L2 like in the paper

So not implementing L2 with weight decay or momentum scheduling

In [7]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, out1, out2, label):
        euclidean_distance = F.pairwise_distance(out1, out2)
        loss_same = label * torch.pow(euclidean_distance, 2)
        loss_diff = (1 - label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        loss = torch.mean(loss_same + loss_diff) / 2
        return loss

In [10]:
# paper describes more data augmentation, ie, Affine Distortion

transform = transforms.Compose([
    transforms.Resize((105, 105)),
    transforms.ToTensor()
])

train_dataset = OmniglotDataset(
    root=background_path,
    transform=transform,
    num_pairs=20000  # fewer pairs for demonstration
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

model = SiameseNetwork().to(device)
criterion = ContrastiveLoss(margin=2.0)
optimizer = optim.Adam(model.parameters(), lr=1e-4) # SGD used in paper

num_epochs = 5
# for demonstration, upto 200, and early stopped if no decrease in validation error after 20 epochs in paper

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (img1, img2, label) in enumerate(train_loader):
        img1, img2, label = img1.to(device), img2.to(device), label.to(device)
        optimizer.zero_grad()
        out1, out2 = model(img1, img2)
        loss = criterion(out1, out2, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if (i+1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {running_loss/100:.4f}")
            running_loss = 0.0

Epoch [1/5], Step [100/625], Loss: 0.3694
Epoch [1/5], Step [200/625], Loss: 0.2994
Epoch [1/5], Step [300/625], Loss: 0.2882
Epoch [1/5], Step [400/625], Loss: 0.2838
Epoch [1/5], Step [500/625], Loss: 0.2590
Epoch [1/5], Step [600/625], Loss: 0.2454
Epoch [2/5], Step [100/625], Loss: 0.2365
Epoch [2/5], Step [200/625], Loss: 0.2297
Epoch [2/5], Step [300/625], Loss: 0.2249
Epoch [2/5], Step [400/625], Loss: 0.2210
Epoch [2/5], Step [500/625], Loss: 0.2199
Epoch [2/5], Step [600/625], Loss: 0.2113
Epoch [3/5], Step [100/625], Loss: 0.2236
Epoch [3/5], Step [200/625], Loss: 0.2180
Epoch [3/5], Step [300/625], Loss: 0.2087
Epoch [3/5], Step [400/625], Loss: 0.2001
Epoch [3/5], Step [500/625], Loss: 0.1952
Epoch [3/5], Step [600/625], Loss: 0.1982
Epoch [4/5], Step [100/625], Loss: 0.1979
Epoch [4/5], Step [200/625], Loss: 0.1996
Epoch [4/5], Step [300/625], Loss: 0.1979
Epoch [4/5], Step [400/625], Loss: 0.1967
Epoch [4/5], Step [500/625], Loss: 0.1937
Epoch [4/5], Step [600/625], Loss:

In [12]:
def evaluate_oneshot(model, eval_loader, n_way=20, k_tasks=100):
    model.eval()
    correct = 0
    for _ in range(k_tasks):
        tasks = eval_loader.get_one_shot_batch(batch_size=1)
        test_img, support_imgs, correct_idx = tasks[0]

        test_img = test_img.unsqueeze(0).to(device)
        distances = []
        with torch.no_grad():
            feat_test = model.forward_once(test_img)
            for j in range(n_way):
                sup_img = support_imgs[j].unsqueeze(0).to(device)
                feat_sup = model.forward_once(sup_img)
                dist = F.pairwise_distance(feat_test, feat_sup)
                distances.append(dist.item())

        pred_idx = np.argmin(distances)
        if pred_idx == correct_idx:
            correct += 1

    acc = correct / k_tasks * 100.0
    return acc

In [15]:
oneshot_eval_loader = OneShotEvaluation(evaluation_path, transform=transform, n_way=20)

accuracy = evaluate_oneshot(model, oneshot_eval_loader, n_way=20, k_tasks=50)
print(f"One-shot 20-way accuracy: {accuracy:.2f}%")

One-shot 20-way accuracy: 80.00%


In [16]:
!git config --global user.name "Dd1235"
!git config --global user.email "deepya1235@gmail.com"


In [20]:
# !cd /content/LearnAI/mini_projects/Siamese_Network_Omniglot && git add SiameseNetwork.ipynb
# !cd /content/LearnAI/mini_projects/Siamese_Network_Omniglot && git commit -m "Add SiameseNetwork.ipynb notebook from Colab"

# from next time open using github
# and use !git to add and commit

fatal: pathspec 'SiameseNetwork.ipynb' did not match any files
On branch main
Your branch is up to date with 'origin/main'.

nothing to commit, working tree clean


In [18]:
!pwd

/content/LearnAI/mini_projects/Siamese_Network_Omniglot


In [19]:
!ls

'Omniglot Dataset'   oneshot1.pdf   README.md
