<a href="https://colab.research.google.com/github/azernik/semeval_2025_task1/blob/main/tuning_open_clip.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### setup

In [None]:
for downloading results from Drive
!pip install -q gdown

import gdown

In [None]:
!pip install open_clip_torch

In [None]:
download taskA file from Adam's Drive (public) and unzip
file_id = "105JdQU_u98w_xSYaNNSj-r4RsyTPXZEF"
url = f"https://drive.google.com/uc?id={file_id}"
gdown.download(url, "taskA.zip", quiet=True)
! unzip -q - taskA.zip

In [4]:
import open_clip
from PIL import Image

from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode

import pandas as pd
import numpy as np
import os
import torch
from ast import literal_eval
import requests
import json
import csv
import re
from itertools import combinations

from scipy.stats import spearmanr

In [5]:
# define locations
taska_folder = "train"
taska_tsv_filename = "subtask_a_train.tsv"

# load data
df = pd.read_csv(f"{taska_folder}/{taska_tsv_filename}", delimiter="\t")

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
# Preprocess dataframe (image paths, etc.)
image_name_cols = ['image1_name', 'image2_name', 'image3_name', 'image4_name', 'image5_name']
df['image_paths'] = df.apply(lambda row: [os.path.join(taska_folder, row['compound'].replace("'", "_"), row[image_name]) for image_name in image_name_cols], axis=1)
df['image_idx_map'] = df.apply(lambda row: {row[name]: i for i, name in enumerate(image_name_cols)}, axis=1)
df['expected_order_indices'] = df.apply(lambda row: [row['image_idx_map'][name] for name in literal_eval(row['expected_order'])], axis=1)

In [8]:
sentences = df.sentence
compounds = df.compound.apply(lambda x: x.replace("'", "_"))
targets = [literal_eval(t) for t in df.expected_order]
s_types = df.sentence_type
image_paths = df['image_paths']

### evaluation methods

In [9]:
def evaluate_predictions(predictions, df, weights=[0.4, 0.3, 0.2, 0.1, 0.0]):
    """
    Takes predictions, returns three types of evaluation metrics:
    - Top-1 Accuracy
    - Average Spearman Correlation
    - Average Weighted Accuracy
    """
    correct_top1 = 0
    spearman_scores, weighted_scores = [], []

    for i in range(len(predictions)):
        # if len(predictions[i]) == 0:
        #     continue

        # Ground truth and predictions
        # pred_order = [df['image_idx_map'].iloc[i][os.path.basename(df['image_paths'].iloc[i][j])] for j in predictions[i]]
        pred_order = predictions[i]
        ground_truth_order = df['expected_order_indices'].iloc[i]

        # Top-1 accuracy
        if pred_order[0] == ground_truth_order[0]:
            correct_top1 += 1

        # Spearman correlation
        score, _ = spearmanr(pred_order, ground_truth_order)
        spearman_scores.append(score)

        # Weighted accuracy
        weighted_score = sum(weights[j] for j, img in enumerate(pred_order) if img == ground_truth_order[j])
        weighted_scores.append(weighted_score)

    return {
        "top1_accuracy": correct_top1 / len(predictions),
        "average_spearman": sum(spearman_scores) / len(spearman_scores),
        "average_weighted_accuracy": sum(weighted_scores) / len(weighted_scores),
        "spearman_scores": spearman_scores,
        "weighted_scores": weighted_scores
    }

In [10]:
def save_results(experiment_name, base_model, model_name, metrics, results_file="experiment_results.csv"):
    """
    Save experiment results to a CSV file.
    """
    # Add experiment name to metrics
    results_row = {
        "base_model": base_model,
        "model": model_name,
        "experiment": experiment_name,
        "top1_accuracy": metrics["top1_accuracy"],
        "average_spearman": metrics["average_spearman"],
        "average_weighted_accuracy": metrics["average_weighted_accuracy"],
    }

    # Write results to CSV
    write_header = not os.path.exists(results_file)
    with open(results_file, mode="a", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=results_row.keys())
        if write_header:
            writer.writeheader()
        writer.writerow(results_row)

    print(f"Results saved to {results_file}")

In [11]:
def save_predictions(df, image_paths, predictions, confidence_scores, metrics, prefix, preds_dir='predictions'):
    """
    Save detailed predictions and confidence scores for each example.
    """
    # create 'preds' directory if doesn't exist
    if not os.path.exists(preds_dir):
        os.makedirs(preds_dir)

    # generate output filename
    prefix = prefix.strip().replace(" ", "_")
    prefix = re.sub(r'[^a-zA-Z0-9_-]', '', prefix)
    output_path = f"{preds_dir}/{prefix}_preds.csv"

    spearman_scores = metrics["spearman_scores"]
    weighted_scores = metrics["weighted_scores"]
    with open(output_path, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["index", "compound", "ground_truth_order", "predicted_order", "top1_score", "spearman_score", "weighted_score", "confidence_scores"])

        for i, (pred, conf) in enumerate(zip(predictions, confidence_scores)):
            # pred_order = [df['image_idx_map'].iloc[i][os.path.basename(image_paths.iloc[i][j])] for j in pred]
            pred_order = pred
            ground_truth_order = df["expected_order_indices"].iloc[i]
            top1_score = 1 if pred_order[0] == ground_truth_order[0] else 0
            spearman_score = round(spearman_scores[i], 3)
            weighted_score = round(weighted_scores[i], 3)
            formatted_conf_scores = [round(c.item(), 3) for c in conf]
            writer.writerow([i, df["compound"].iloc[i], ground_truth_order, pred_order, top1_score, spearman_score, weighted_score, formatted_conf_scores])

    print(f"Predictions saved to {output_path}")

In [12]:
def openclip_image_ranking(model, image_processor, tokenizer, image_paths, sentence):
    image_inputs = torch.stack([image_processor(Image.open(ipath)) for ipath in image_paths]).to(device)
    text_input = tokenizer([sentence]).to(device)

    with torch.no_grad():
        image_features = model.encode_image(image_inputs)
        text_features = model.encode_text(text_input)

    # normalise features
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    # dot product & softmax
    similarity = (100.0 * text_features @ image_features.T).softmax(dim=-1)

    # order by similarity
    probs, indices = similarity[0].topk(5)
    return probs, indices


### training and dataset functions

In [13]:
def openclip_train(model, tokenizer, image_preprocess, dataloader, optimizer):
    # one epoch only
    # image paths are ordered by how similar they should be to the sentence
    model.train()
    margin = 0.1
    for batch in dataloader:
        optimizer.zero_grad()

        texts, imgs1, imgs2 = batch

        # encode text and images - always the preferred image is img1
        text_input = tokenizer(texts).to(device)

        text_features = model.encode_text(text_input, normalize=True)

        image_inputs1 = torch.stack([image_preprocess(Image.open(ipath)) for ipath in imgs1]).to(device)
        image_features1 = model.encode_image(image_inputs1, normalize=True)

        image_inputs2 = torch.stack([image_preprocess(Image.open(ipath)) for ipath in imgs2]).to(device)
        image_features2 = model.encode_image(image_inputs2, normalize=True)

        # dot product
        B, D = text_features.shape
        similarities1 = torch.bmm(text_features.view(B, 1, D), image_features1.view(B, D, 1)) # expected to be more similar
        similarities1 = similarities1.squeeze(-1)
        similarities2 = torch.bmm(text_features.view(B, 1, D), image_features2.view(B, D, 1)) # expected to be less similar
        similarities2 = similarities2.squeeze(-1)

        # compare logits
        contrastive_loss = torch.nn.functional.relu(margin + similarities2 - similarities1).sum() # less - more to give -ve diff and 0 loss if correct

        # update params
        contrastive_loss.backward()
        # print(contrastive_loss)
        optimizer.step()

    return model

def openclip_evaluate(model, tokenizer, image_preprocess, test_sentences, test_image_paths, test_targets, verbose=True):
    model.eval()
    predictions, confidence = [], []
    for s, ipaths, tgt in zip(test_sentences, test_image_paths, test_targets):
        sorted_probs, ids_sorted = openclip_image_ranking(model, image_preprocess, tokenizer, ipaths, s)
        predictions.append(ids_sorted.tolist())
        confidence.append(100 * sorted_probs)
    return predictions, confidence


In [14]:
from torch.utils.data import DataLoader, Dataset

# Custom Dataset
class PairwiseDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, img1, img2 = self.data[idx]
        # Add logic to load image tensors if needed
        return text, img1, img2


In [15]:
def reorder_image_paths(ipaths, tgt):
    tgt_order = {t:j for j, t in enumerate(tgt)}
    ordered_ipaths = sorted(ipaths, key=lambda x: tgt_order[x.split('/')[-1]])
    return ordered_ipaths

# image_paths are the original order
# ordered_image_paths are in the target order, which is useful for training
# (but also means if you evaluate performance on the training set using the standard function it looks terrible)

image_paths_copy = image_paths.copy()
ordered_image_paths = [reorder_image_paths(ipaths, targets[i]) for i, ipaths in enumerate(image_paths_copy)]

def split_train_and_test_data(test_indices, train_indices, input_text):
    testing_sentences = [input_text[idx] for idx in test_indices]
    training_sentences = [input_text[idx] for idx in train_indices]

    testing_targets = [targets[idx] for idx in test_indices]
    training_targets = [targets[idx] for idx in train_indices]

    testing_image_paths = [image_paths[idx] for idx in test_indices]
    training_image_paths = [ordered_image_paths[idx] for idx in train_indices]

    return {'train': (training_sentences, training_image_paths, training_targets),
            'test': (testing_sentences, testing_image_paths, testing_targets)}

def make_pairwise(sentences, image_paths):
    pairwise_dataset = []
    for text, images in zip(sentences, image_paths):
        pairs = list(combinations(range(len(images)), 2))  # All 10 pairs
        for i, j in pairs:
            pairwise_dataset.append((text, images[i], images[j]))
    return pairwise_dataset


### get data and run training loop

In [17]:
import gdown
# Download the file of prompt responses from Google Drive
gdown.download("https://drive.google.com/uc?id=1T9pMSMj6JQP0DCLy-6H7dfUtVRWy39uq", 'gpt_prompt_responses.csv', quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1T9pMSMj6JQP0DCLy-6H7dfUtVRWy39uq
To: /home/shurui/projects/semeval_2025_task1/gpt_prompt_responses.csv
100%|██████████| 30.7k/30.7k [00:00<00:00, 3.98MB/s]


'gpt_prompt_responses.csv'

In [18]:
df_text_inputs = pd.read_csv('gpt_prompt_responses.csv')

Trains two epochs across 10 splits of the data to get results for each sample.
The results are collected together then evaluated and saved.

In [None]:
num_groups = 10
order_for_testing = torch.randperm(len(sentences))
testing_groups = torch.chunk(order_for_testing, num_groups)

num_epochs = 2

all_predictions = {i:
    {'preds': [[0,1,2,3,4]]*len(sentences), 'conf': [[0.2,0.2,0.2,0.2,0.2]]*len(sentences)}
    for i in range(num_epochs)}

experiment_name = 'baseline_sentences'
base_model = 'openclip'
model_name = 'ViT-B-32_finetune'

for i in range(len(testing_groups)):
    test_indices = testing_groups[i].tolist()
    train_indices = torch.concat(testing_groups[:i] + testing_groups[i+1:]).tolist()
    split_data = split_train_and_test_data(test_indices, train_indices, sentences)

    pairwise_train_data = make_pairwise(*split_data['train'][:2])

    # Initialize Dataset and DataLoader for training
    pairwise_Dataset = PairwiseDataset(pairwise_train_data)
    pairwise_dataloader = DataLoader(pairwise_Dataset, batch_size=8, shuffle=True)

    # prep model
    model_openclip, _, preprocess_openclip = open_clip.create_model_and_transforms('ViT-B-32', pretrained='laion2b_s34b_b79k')
    model_openclip.to(device)
    # model_openclip.eval()  # model in train mode by default, impacts some models with BatchNorm or stochastic depth active
    tokenizer = open_clip.get_tokenizer('ViT-B-32')
    model_openclip.train()

    # don't train the image part of the model
    for param in model_openclip.visual.parameters():
        param.requires_grad = False

    # # only train the image part of the model
    # for name, param in model_openclip.named_parameters():
    #     if not name.startswith('visual'):
    #         param.requires_grad = False

    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model_openclip.parameters()), lr=1e-6)

    # pre training eval
    model = model_openclip
    for epoch in range(num_epochs):
        print(f'training split {i}, epoch {epoch}')
        model = openclip_train(model, tokenizer, preprocess_openclip, pairwise_dataloader, optimizer)
        predictions, confidence = openclip_evaluate(model, tokenizer, preprocess_openclip, *split_data['test'], verbose=False)

        for j, orig_id in enumerate(test_indices):
            all_predictions[epoch]['preds'][orig_id] = predictions[j]
            all_predictions[epoch]['conf'][orig_id] = confidence[j]

# print(all_predictions)
for epoch in range(num_epochs):
    preds = all_predictions[epoch]['preds']
    conf = all_predictions[epoch]['conf']
    results = evaluate_predictions(preds, df)

    save_results(experiment_name, base_model, model_name+f'_e{epoch}', results, results_file="experiment_results.csv")

    prefix = experiment_name+'_'+base_model+'_'+model_name+f'_e{epoch}'
    save_predictions(df, None, preds, conf, results, prefix, preds_dir='predictions')
    
# save the model
torch.save(model.state_dict(), f"openclip_{model_name}_epoch{num_epochs}.pth")



open_clip_model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]

training split 0, epoch 0
training split 0, epoch 1


KeyboardInterrupt: 