In [1]:
import torch
from torch import nn
import torch.nn.functional as F

from torch.utils.data import DataLoader,Dataset,random_split

from torchvision import datasets
from torchvision.transforms import v2,ToTensor
from torchvision.io import read_image, decode_image


import os
from tqdm import tqdm
import random
import json

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

For colab

In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
pg = 'indexes_data/style_sample_pairs_groups.json'
itg = 'indexes_data/style_sample_indexes_to_groups.json'
im_dir = 'image_data'

Image transformation 

In [5]:
transformer_pipe = v2.Compose([
    v2.Resize((224,224)),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

Contrast data and loss

In [6]:
class ContrastiveDataset(Dataset):

    def __init__(self, indexes_to_groups,group_paris,img_dir,transform=None, rand_state=12):

        self.rand_state = rand_state
        self.transform = transform
        self.img_dir = img_dir

        with open(indexes_to_groups) as f1:
            self.idd = json.loads(f1.read())

        with open(group_paris) as f2:
            self.gpd = json.loads(f2.read())

        self.train_list = []

        ## ADD shuffle

        for index,val in self.idd.items():

            v = self.gpd[str(val)]
            ind = int(index)

            random.seed(2)

            for p in v[0][:10]:
                self.train_list.append((ind,p,1))
            for n in random.sample(v[1],10):
                self.train_list.append((ind,n,0))



    def __len__(self):
        return len(self.train_list)


    def __getitem__(self, index):

        (anchor,other,score) = self.train_list[index]

        anchor_image = read_image(os.path.join(self.img_dir, f'{anchor}.jpg'))
        other_image = read_image(os.path.join(self.img_dir, f'{other}.jpg'))


        if self.transform:
            anchor_image = self.transform(anchor_image)
            other_image = self.transform(other_image)


        return (anchor_image,other_image,score)



class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # Calculate the Euclidean distance between the two embeddings
        distance = F.pairwise_distance(output1, output2)

        # Calculate Contrastive Loss
        loss = 0.5 * (label * distance.pow(2) + (1 - label) * F.relu(self.margin - distance).pow(2))
        return loss.mean()


In [7]:
CDS = ContrastiveDataset(itg,pg,im_dir,transform=transformer_pipe,rand_state=1000)

Model with weights.

In [9]:
from torchvision.models import ShuffleNet_V2_X1_0_Weights,shufflenet_v2_x1_0

weights = ShuffleNet_V2_X1_0_Weights.DEFAULT
model = shufflenet_v2_x1_0(weights=weights)
model

ShuffleNetV2(
  (conv1): Sequential(
    (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (stage2): Sequential(
    (0): InvertedResidual(
      (branch1): Sequential(
        (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=24, bias=False)
        (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (3): BatchNorm2d(58, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU(inplace=True)
      )
      (branch2): Sequential(
        (0): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(58, eps=1e-05, momentum=0.1, affine=True, track_running_

Last embedding layers 

In [10]:
model.fc = nn.Sequential(
    nn.Linear(1024,512),
    nn.ReLU(),# Additional linear layer with 256 output features
    nn.Linear(512,256),  # Additional linear layer with 256 output features
)

Freeze layers

In [11]:
for param in model.parameters():
    if isinstance(param, nn.Conv2d):
        param.requires_grad = False

In [12]:
model.to(device)

ShuffleNetV2(
  (conv1): Sequential(
    (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (stage2): Sequential(
    (0): InvertedResidual(
      (branch1): Sequential(
        (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=24, bias=False)
        (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (3): BatchNorm2d(58, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU(inplace=True)
      )
      (branch2): Sequential(
        (0): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(58, eps=1e-05, momentum=0.1, affine=True, track_running_

Data splits

In [None]:

# Split dataset into training and validation
train_size = int(0.9 * len(CDS))  # 80% training, 20% validation
val_size = len(CDS) - train_size
train_dataset, val_dataset = random_split(CDS, [train_size, val_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [14]:
'''

train_dataset = CDS
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)

contrastive_loss = ContrastiveLoss(margin=1.0)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, (input1, input2, label) in tqdm(enumerate(train_loader),total=CDS.__len__()//256):

        input1, input2, label = input1.to(device), input2.to(device), label.to(device)

        # Embedings
        output1 = model(input1)
        output2 = model(input2)

        #Loss
        loss = contrastive_loss(output1, output2, label)

        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate the loss
        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
'''




# Loss function and optimizer
contrastive_loss = ContrastiveLoss(margin=1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    # Training phase
    model.train()
    running_loss = 0.0

    for (input1, input2, label) in tqdm(train_loader, total=len(train_loader)):
        input1, input2, label = input1.to(device), input2.to(device), label.to(device)

        # Forward pass
        output1 = model(input1)
        output2 = model(input2)

        # Compute loss
        loss = contrastive_loss(output1, output2, label)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate training loss
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)

    # Validation phase
    model.eval()
    val_loss = 0.0

    with torch.no_grad():
        for input1, input2, label in val_loader:
            input1, input2, label = input1.to(device), input2.to(device), label.to(device)

            # Forward pass
            output1 = model(input1)
            output2 = model(input2)

            # Compute loss
            loss = contrastive_loss(output1, output2, label)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')


  0%|          | 0/300 [00:00<?, ?it/s]

100%|██████████| 300/300 [37:28<00:00,  7.50s/it]


Epoch [1/10], Train Loss: 0.0110, Val Loss: 0.0840


 25%|██▍       | 74/300 [09:05<27:46,  7.38s/it]


KeyboardInterrupt: 

In [None]:
torch.save(model, 'model')

Test some images outside of the train data.

First 2 are positive, last is negative.

In [None]:
img = decode_image(im_dir+"/1.jpg")

# Step 3: Apply inference preprocessing transforms
batch = transformer_pipe(img).unsqueeze(0)


batch = batch.to(device)

# Step 4: Use the model and print the predicted category
prediction_a = model(batch).squeeze(0).softmax(0)

In [None]:
img = decode_image(im_dir+"/0.jpg")

# Step 3: Apply inference preprocessing transforms
batch = transformer_pipe(img).unsqueeze(0)


batch = batch.to(device)

# Step 4: Use the model and print the predicted category
prediction_b = model(batch).squeeze(0).softmax(0)

In [None]:
img = decode_image(im_dir+"/5.jpg")

# Step 3: Apply inference preprocessing transforms
batch = transformer_pipe(img).unsqueeze(0)


batch = batch.to(device)

# Step 4: Use the model and print the predicted category
prediction_c = model(batch).squeeze(0).softmax(0)

In [None]:
F.pairwise_distance(prediction_a,prediction_b)


tensor(0.0009, device='cuda:0', grad_fn=<NormBackward1>)

In [None]:
F.pairwise_distance(prediction_a,prediction_c)


tensor(0.0006, device='cuda:0', grad_fn=<NormBackward1>)