In [2]:
import gc
import os

import math
import random
import numpy as np
from PIL import Image
from io import BytesIO

from matplotlib import pyplot as plt
from tqdm import tqdm

import torch
from torchvision import transforms, datasets, models
from torch.nn import BCEWithLogitsLoss
from torch.nn.modules import flatten
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import TensorDataset, DataLoader, Dataset, random_split
from torch.cuda.amp import autocast, GradScaler
from torchsummary import summary

from torch import nn, optim

import torch.nn.functional as F
from torchvision import datasets, transforms

torch.manual_seed(42)
np.random.seed(42)

# print(torch.cuda.get_device_name(0))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ModuleNotFoundError: No module named 'torch'

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim):
        super(Encoder, self).__init__()
        self.fc = nn.Linear(input_dim, embedding_dim)
        
    def forward(self, x):
        return self.fc(x)

# Define the contrastive loss with cosine similarity
class ContrastiveCosineLoss(nn.Module):
    def __init__(self, margin=0.5):
        super(ContrastiveCosineLoss, self).__init__()
        self.margin = margin
        self.cosine_similarity = nn.CosineSimilarity(dim=1, eps=1e-6)

    def forward(self, output1, output2, label):
        # Cosine similarity
        cos_sim = self.cosine_similarity(output1, output2)
        
        # Calculate the loss
        loss = (1 - label) * 0.5 * cos_sim**2 + \
               label * 0.5 * (torch.relu(self.margin - cos_sim) ** 2)
        
        return loss.mean()
    
class RandomDataset(Dataset):
    def __init__(self, num_samples, input_dim, label_range=(0, 2)):
        self.num_samples = num_samples
        self.input_dim = input_dim
        self.label_range = label_range
        self.data = torch.randn(num_samples, input_dim)
        self.labels = torch.randint(*label_range, (num_samples,))

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        # For simplicity, returning the same random vector for x1 and x2. 
        # In practice, you'd probably want pairs of samples (e.g., anchor and positive/negative).
        x1 = self.data[idx]
        x2 = self.data[(idx+1) % self.num_samples]  # Just for variety; wraps around at the end.
        label = (self.labels[idx] == self.labels[(idx+1) % self.num_samples]).float()  # Label 1 if same, 0 if different
        return x1, x2, label

In [None]:
def train(encoder_1, encoder_2, criterion, optimizer, train_set, test_set, batch_size, num_epochs=10):
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

    train_losses = []
    test_losses = []

    # Note encoder 1 is frezzing
    encoder_1.eval()
    for param in encoder_1.parameters():
        param.requires_grad = False
    
    for epoch in range(num_epochs):
        # Training Loop
        encoder_2.train()
        train_loss = 0
        for batch_idx, (x1, x2, labels) in enumerate(train_loader):
            optimizer.zero_grad()

            embedding1 = encoder_1(x1)
            embedding2 = encoder_2(x2)

            loss = criterion(embedding1, embedding2, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Testing Loop
        encoder_2.eval()
        test_loss = 0
        with torch.no_grad():
            for x1, x2, labels in test_loader:
                embedding1 = encoder_1(x1)
                embedding2 = encoder_2(x2)

                loss = criterion(embedding1, embedding2, labels)
                test_loss += loss.item()

        avg_test_loss = test_loss / len(test_loader)
        test_losses.append(avg_test_loss)

        print(f"Epoch [{epoch + 1}/{num_epochs}] Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}")

    return train_losses, test_losses

In [None]:
input_dim = 512
dataset = RandomDataset(num_samples=1000, input_dim=input_dim)

encoder_1 = Encoder(input_dim, 1024)    # frozen pre-trained model
encoder_2 = Encoder(input_dim, 1024)

criterion = ContrastiveCosineLoss()
optimizer = optim.Adam(encoder_2.parameters(), lr=0.001)

train_length = int(0.8 * len(dataset))
test_length = len(dataset) - train_length
train_set, test_set = random_split(dataset, [train_length, test_length])

batch_size = 32
epochs = 1
train(encoder_1, encoder_2, criterion, optimizer, train_set, test_set, batch_size, epochs)