<a href="https://colab.research.google.com/github/ZhePang/Painting_Identification/blob/main/Painting_Identification_TripletLoss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load preprocessed dataset from Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd

# Path to your CSV file on Google Drive
csv_file_path = '/content/drive/My Drive/painting_identification_data.csv'

# Load the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

In [3]:
import zipfile
import os

# Path to your zip file on Google Drive
zip_file_path = '/content/drive/My Drive/painting_identification_data.zip.zip'

# Directory where you want to unzip the contents
extract_to_path = '/content/data'

# Create directory if it doesn't exist
os.makedirs(extract_to_path, exist_ok=True)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)

# Prepare dataset and dataloader for finetuning


*   sample p classes and k images from each class to from a batch of p*k images




In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import numpy as np
from torchvision import transforms

In [5]:
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x78a14299b270>

In [6]:
df.head()

Unnamed: 0,Filename,Title,Label,Filepath
0,Cornelia_Street_John_French_Sloan_1920_origina...,Cornelia_Street_John_French_Sloan_1920,556,/content/data/Cornelia_Street_John_French_Sloa...
1,Cornelia_Street_John_French_Sloan_1920_augment...,Cornelia_Street_John_French_Sloan_1920,556,/content/data/Cornelia_Street_John_French_Sloa...
2,Cornelia_Street_John_French_Sloan_1920_augment...,Cornelia_Street_John_French_Sloan_1920,556,/content/data/Cornelia_Street_John_French_Sloa...
3,Cornelia_Street_John_French_Sloan_1920_augment...,Cornelia_Street_John_French_Sloan_1920,556,/content/data/Cornelia_Street_John_French_Sloa...
4,Cornelia_Street_John_French_Sloan_1920_augment...,Cornelia_Street_John_French_Sloan_1920,556,/content/data/Cornelia_Street_John_French_Sloa...


In [7]:
class TripletDataset(Dataset):
    def __init__(self, dataframe, base_path, transform=None):
        """
        Args:
            dataframe (DataFrame): DataFrame containing 'Filename' and 'Label'.
            base_path (str): Directory where images are stored.
            P (int): Number of different classes to sample in each batch.
            K (int): Number of images per class to sample in each batch.
        """
        self.dataframe = dataframe
        self.base_path = base_path
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])


    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Choose P labels randomly
        row = self.dataframe.iloc[idx]
        image_path = row['Filepath']
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)
        label = row['Label']
        title = row['Title']
        original_path = f"/content/data/{title}/{title}_original.jpg"
        original_image = self.transform(Image.open(original_path).convert('RGB'))
        return image, label, original_image


In [9]:
from torch.utils.data import Sampler
import random

class PKSampler(Sampler):
    def __init__(self, dataframe, P, K, n):
        self.num_samples = len(dataframe)
        self.P = P
        self.K = K
        self.labels_to_indices = {
            label: list(sub_df.index)
            for label, sub_df in dataframe.groupby('Label')
        }
        self.labels = list(self.labels_to_indices.keys())
        self.n_batches = n

    def __iter__(self):
        batches = []
        for _ in range(self.n_batches):
            batch = []
            classes = random.sample(self.labels, self.P)
            for class_ in classes:
                indices = random.sample(self.labels_to_indices[class_], self.K)
                batch.extend(indices)
            batches.append(batch)
        return iter(batches)

    def __len__(self):
        return self.n_batches

In [10]:
data_dir = "/content/data"
P = 18
K = 4
dataset = TripletDataset(df, data_dir)
dataloader = DataLoader(dataset, batch_sampler=PKSampler(df, P, K, 500))

# Create TripletLoss function and load pretrained model

In [11]:
import torch.nn as nn

margin = 0.2
triplet_loss = nn.TripletMarginLoss(margin=margin)

In [None]:
!pip install timm

In [13]:
import timm

model_rn = timm.create_model(
    'resnetv2_50x1_bit.goog_in21k',
    pretrained=True,
    num_classes=0,
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/273M [00:00<?, ?B/s]

In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_rn.to(device)

ResNetV2(
  (stem): Sequential(
    (conv): StdConv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (pad): ConstantPad2d(padding=(1, 1, 1, 1), value=0.0)
    (pool): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (stages): Sequential(
    (0): ResNetStage(
      (blocks): Sequential(
        (0): PreActBottleneck(
          (downsample): DownsampleConv(
            (conv): StdConv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (norm): Identity()
          )
          (norm1): GroupNormAct(
            32, 64, eps=1e-05, affine=True
            (drop): Identity()
            (act): ReLU(inplace=True)
          )
          (conv1): StdConv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (norm2): GroupNormAct(
            32, 64, eps=1e-05, affine=True
            (drop): Identity()
            (act): ReLU(inplace=True)
          )
          (conv2): StdConv2d(64, 64, kernel_size=(3, 3)

# Define Train loop

*   use the original image as the positive sample
*   find the hardest negative sample in the batch (the one that is most close to the original image but is not the same class)



In [15]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from tqdm.notebook import tqdm

In [16]:
def train_one_epoch(model, dataloader, loss_fn, optimizer, device):
    model.train()
    total_loss = 0
    progress_bar = tqdm(enumerate(dataloader), total=len(dataloader), desc="Training")

    for images, labels, original_images in tqdm(dataloader, total=len(dataloader), desc="Training"):
        images = images.to(device)
        labels = labels.to(device)
        original_images = original_images.to(device)

        # Clear the gradients
        optimizer.zero_grad()

        # Forward pass: compute the model output
        embeddings = model(images)
        original_embeddings = model(original_images)

        # Storage for selected triplets
        anchor_embeddings = []
        positive_embeddings = []
        negative_embeddings = []

        class_hard_negatives = {}

        # find the hard negative for each image
        for i in range(len(labels)):
          anchor = embeddings[i]
          positive = original_embeddings[i]
          hard_negative = None

          if labels[i] in class_hard_negatives:
            hard_negative = class_hard_negatives[labels[i]]
          else:
            dists = torch.norm(embeddings - positive, p=2, dim=1)
            mask = labels != labels[i]
            negative_dists = dists[mask]
            negatives = embeddings[mask]
            if len(negative_dists) > 0:
              hard_negative = negatives[torch.argmin(negative_dists)]
              class_hard_negatives[labels[i]] = hard_negative

          anchor_embeddings.append(anchor)
          positive_embeddings.append(positive)
          negative_embeddings.append(hard_negative)

        anchor_embeddings = torch.stack(anchor_embeddings)
        positive_embeddings = torch.stack(positive_embeddings)
        negative_embeddings = torch.stack(negative_embeddings)

        # Calculate triplet loss
        loss = loss_fn(anchor_embeddings, positive_embeddings, negative_embeddings)
        total_loss += loss.item()

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(dataloader)
    return avg_loss


def train_model(model, dataloader, loss_fn, optimizer, scheduler, device, num_epochs):
    model.to(device)

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        avg_loss = train_one_epoch(model, dataloader, loss_fn, optimizer, device)
        print("Epoch {}, Average Loss: {:.4f}".format(epoch, avg_loss))

        # Step the learning rate scheduler
        scheduler.step()

# Training

In [17]:
optimizer = optim.Adam(model_rn.parameters(), lr=0.001)

# Initialize StepLR scheduler
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

In [18]:
torch.cuda.empty_cache()

In [19]:
train_model(model_rn, dataloader, triplet_loss, optimizer, scheduler, device, num_epochs=10)

Epoch 1/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 0, Average Loss: 0.0074
Epoch 2/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 1, Average Loss: 0.0023
Epoch 3/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 2, Average Loss: 0.0025
Epoch 4/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 3, Average Loss: 0.0010
Epoch 5/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 4, Average Loss: 0.0010
Epoch 6/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 5, Average Loss: 0.0016
Epoch 7/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 6, Average Loss: 0.0005
Epoch 8/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 7, Average Loss: 0.0002
Epoch 9/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 8, Average Loss: 0.0018
Epoch 10/10


Training:   0%|          | 0/500 [00:00<?, ?it/s]

Training:   0%|          | 0/500 [00:00<?, ?it/s]

Epoch 9, Average Loss: 0.0009


In [20]:
# save the model
torch.save(model_rn.state_dict(), '/content/drive/My Drive/painting_identification_model_weights_p18k4n500e10.pth')