In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR

This experiement takes the highest similarity image embedding as the positive example,the lowest similarityimage embedding as the negative example and the text embedding as the archor of triplet loss. The similarity is computed with the cosine similarity.

In [99]:


# 定义 Siamese Network 模型
# define Siamese Network model

class SiameseNetwork(nn.Module):
    def __init__(self, embedding_dim):
        super(SiameseNetwork, self).__init__()
        self.fc1 = nn.Linear(embedding_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.output_layer = nn.Linear(64, 32)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.output_layer(x)

# 定义三元组损失函数
# define triplet loss
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        positive_distance = F.pairwise_distance(anchor, positive, keepdim=True)
        negative_distance = F.pairwise_distance(anchor, negative, keepdim=True)
        loss = F.relu(positive_distance - negative_distance + self.margin)
        return loss.mean()

def get_triplets(text_embeddings, image_embeddings_batch):
    batch_size = text_embeddings.size(0)
    anchors = text_embeddings
    positives = []
    negatives = []

    for i in range(batch_size):
        similarities = F.cosine_similarity(text_embeddings[i].unsqueeze(0), image_embeddings_batch[i], dim=1)
        positive_index = similarities.argmax().item()

        # Handle negative indices correctly
        negative_indices = (similarities < similarities[positive_index]).nonzero(as_tuple=False).squeeze()
        if negative_indices.nelement() > 0:
            negative_index = negative_indices[torch.randint(0, len(negative_indices), (1,))].item()
        else:
            negative_index = torch.randint(0, image_embeddings_batch.size(0), (1,)).item()

        positives.append(image_embeddings_batch[i][positive_index])
        negatives.append(image_embeddings_batch[i][negative_index])

    positives = torch.stack(positives)
    negatives = torch.stack(negatives)
    return anchors, positives, negatives




# 加载嵌入数据
# load embedding data

embedding_path = '/content/gdrive/MyDrive/clip_train_idiom_embeddings.pt'
data = torch.load(embedding_path, map_location='cpu')
text_embeddings = [item['text_embedding'].squeeze(0) for item in data]
image_embeddings = [img['image_embedding'].squeeze(0) for item in data for img in item['images']]

# 转换为张量
# transfer to tensor
text_embeddings_tensor = torch.stack(text_embeddings)
image_embeddings_tensor = torch.stack(image_embeddings)

# 打印嵌入张量形状
# print the shape of embedding tensor
print(f"Number of text embeddings: {len(text_embeddings)}")
print(f"Number of image embeddings: {len(image_embeddings)}")
print(f"Image embeddings tensor shape: {image_embeddings_tensor.shape}")

# 定义数据集
# define the dataset
class EmbeddingDataset(Dataset):
    def __init__(self, text_embeddings, image_embeddings):
        self.text_embeddings = text_embeddings
        self.image_embeddings = image_embeddings

    def __len__(self):
        return len(self.text_embeddings)

    def __getitem__(self, index):
        return self.text_embeddings[index], self.image_embeddings

# 数据加载器
# load dataloader
dataset = EmbeddingDataset(text_embeddings_tensor, image_embeddings_tensor)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# 初始化模型和损失函数
# initialize model and loss function
embedding_dim = text_embeddings_tensor.size(1)
model = SiameseNetwork(embedding_dim=embedding_dim)
criterion = TripletLoss(margin=1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# 训练循环
# training loop
num_epochs = 50
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, verbose=True)
for epoch in range(num_epochs):
    total_loss = 0
    for text_embeddings, image_embeddings_batch in dataloader:
        anchors, positives, negatives = get_triplets(text_embeddings, image_embeddings_batch)

        anchor_outputs = model(anchors)
        positive_outputs = model(positives)
        negative_outputs = model(negatives)

        loss = criterion(anchor_outputs, positive_outputs, negative_outputs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        scheduler.step(total_loss / len(dataloader))
        # 使用学习率调度器
        lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)


    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}")


  data = torch.load(embedding_path, map_location='cpu')


Number of text embeddings: 38
Number of image embeddings: 190
Image embeddings tensor shape: torch.Size([190, 512])
Epoch 1, Loss: 0.9954
Epoch 2, Loss: 0.9835
Epoch 3, Loss: 0.9699
Epoch 4, Loss: 0.9642
Epoch 5, Loss: 0.9543
Epoch 6, Loss: 0.9382
Epoch 7, Loss: 0.9102
Epoch 8, Loss: 0.9200
Epoch 9, Loss: 0.9173
Epoch 10, Loss: 0.9105
Epoch 11, Loss: 0.8921
Epoch 12, Loss: 0.9011
Epoch 13, Loss: 0.8875
Epoch 14, Loss: 0.9071
Epoch 15, Loss: 0.8902
Epoch 16, Loss: 0.8974
Epoch 17, Loss: 0.9194
Epoch 18, Loss: 0.8884
Epoch 19, Loss: 0.9058
Epoch 20, Loss: 0.9036
Epoch 21, Loss: 0.9062
Epoch 22, Loss: 0.9119
Epoch 23, Loss: 0.9142
Epoch 24, Loss: 0.9112
Epoch 25, Loss: 0.9034
Epoch 26, Loss: 0.9156
Epoch 27, Loss: 0.9017
Epoch 28, Loss: 0.9097
Epoch 29, Loss: 0.9062
Epoch 30, Loss: 0.8918
Epoch 31, Loss: 0.8977
Epoch 32, Loss: 0.9037
Epoch 33, Loss: 0.9003
Epoch 34, Loss: 0.9112
Epoch 35, Loss: 0.9092
Epoch 36, Loss: 0.8978
Epoch 37, Loss: 0.9075
Epoch 38, Loss: 0.9057
Epoch 39, Loss: 0.9

通过计算相似度把排在第一的image的embedding作为positive sample，后面4个图片的embedding作为negative samples，然后当排在第二的image embedding作为positive sample的时候后面3个图片的embedding作为negative samples并且从其他不相关的compound的image embedding随机选一个出来补成4个negative samples，然后当排在第三的image embedding作为positive sample的时候后面2个图片的embedding作为negative samples并且从其他不相关的compound的image embedding随机选两个出来补成4个negative samples，以此类推

In this example, the get_triplets function constructs text-image triplets by calculating cosine similarity between text and image embeddings. It selects the most similar image as a positive sample and chooses lower-similarity images as negatives. If negative samples are insufficient, random negatives are drawn from remaining embeddings to ensure each triplet contains four negatives.

In [100]:
# 定义 Siamese Network 模型
# define Siamese Network model
class SiameseNetwork(nn.Module):
    def __init__(self, embedding_dim):
        super(SiameseNetwork, self).__init__()
        self.fc1 = nn.Linear(embedding_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.output_layer = nn.Linear(64, 32)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.output_layer(x)

In [101]:
class WeightedTripletLoss(nn.Module):
    def __init__(self, margin=1.0, positive_weight=1.0, negative_weight=1.5):
        super(WeightedTripletLoss, self).__init__()
        self.margin = margin
        self.positive_weight = positive_weight
        self.negative_weight = negative_weight

    def forward(self, anchor, positive, negatives):
        # caculate the distance between the posivtives and anchor

        positive_distance = F.pairwise_distance(anchor, positive)

        # caculate and average the distance between the negatives and anchor
        negative_distance = F.pairwise_distance(anchor.unsqueeze(1), negatives).mean(dim=1)

        # caculate the weighted loss
        loss = F.relu(self.positive_weight * positive_distance - self.negative_weight * negative_distance + self.margin)
        return loss.mean()


In [102]:
def get_triplets(text_embedding, image_embeddings, all_image_embeddings):
    anchors, positives, negatives = [], [], []

    # 计算文本与所有图像的相似度
    # caculate the similarity between the text and all the image

    similarities = F.cosine_similarity(text_embedding.unsqueeze(0), image_embeddings, dim=-1)
    sorted_indices = similarities.argsort(descending=True)

    for i in range(len(image_embeddings)):
        # 正样本：当前图像嵌入
        # positives: the embedding of the present image
        positive_index = sorted_indices[i].item()
        positive = image_embeddings[positive_index]

        # 负样本：选择比当前正样本相似度更低的图像
        # negatives: choose the images which have the similarity lower than the present image
        negative_candidates = sorted_indices[i + 1:].tolist()

        # 如果负样本不足 4 个，从其他图像中随机补充
        # if the numbers of negatives are lower than 4 then we choose images from other compounds as a negative sample to make up the empty space
        num_negatives_needed = 4 - len(negative_candidates)
        if num_negatives_needed > 0:
            available_negatives = [
                idx for idx in range(all_image_embeddings.size(0))
                if idx not in sorted_indices[:i + 1]
            ]
            supplement_negatives = torch.tensor(available_negatives)[
                torch.randperm(len(available_negatives))[:num_negatives_needed]
            ].tolist()
            negative_candidates.extend(supplement_negatives)

        # 保证每个三元组有 4 个负样本
        # ensure every triplet group has 4 negatives
        negative = torch.stack([all_image_embeddings[idx] for idx in negative_candidates[:4]])

        # 添加到三元组列表
        # add to triplet list
        anchors.append(text_embedding)
        positives.append(positive)
        negatives.append(negative)

    # 转换为张量
    # transfer to tensor
    anchors = torch.stack(anchors)
    positives = torch.stack(positives)
    negatives = torch.stack(negatives)

    return anchors, positives, negatives


# 加载嵌入数据
# load emabedding data
embedding_path = '/content/gdrive/MyDrive/clip_train_idiom_embeddings.pt'
data = torch.load(embedding_path, map_location='cpu')

# 去掉多余的维度
# remove unnecessary dim
text_embeddings = [item['text_embedding'].squeeze(0) for item in data]
image_embeddings = [[img['image_embedding'].squeeze(0) for img in item['images']] for item in data]

# 展平嵌套的 image_embeddings
#Flatten the nested image_embeddings
all_image_embeddings = torch.stack([img for group in image_embeddings for img in group])

# 将每个图像组转换为 2D Tensor
#change all the image group to 2D Tensor
image_embeddings = [torch.stack(group) for group in image_embeddings]

class EmbeddingDataset(Dataset):
    def __init__(self, text_embeddings, image_embeddings):
        self.text_embeddings = text_embeddings
        self.image_embeddings = image_embeddings

    def __len__(self):
        return len(self.text_embeddings)

    def __getitem__(self, index):
        return self.text_embeddings[index], self.image_embeddings[index]

# 数据加载器
# data loader
dataloader = DataLoader(EmbeddingDataset(text_embeddings, image_embeddings), batch_size=1, shuffle=True)



  data = torch.load(embedding_path, map_location='cpu')


In [103]:
#初始化模型和损失函数
#initializr the model and loss function
embedding_dim = text_embeddings[0].shape[0]
model = SiameseNetwork(embedding_dim=embedding_dim)
criterion = WeightedTripletLoss(margin=1.0, positive_weight=1.0, negative_weight=1.5)

# 设置学习率调度器
# set learning rate optimizer
#optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-6)

scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5, verbose=True)

#scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, verbose=True)
step_lr_scheduler = StepLR(optimizer, step_size=30, gamma=0.1)

In [104]:

# 训练循环
# training loop
num_epochs = 50
for epoch in range(num_epochs):
    total_loss = 0
    for text_embedding, image_embeddings_batch in dataloader:
        # 移除 batch 维度
        # remove batch dim
        text_embedding = text_embedding.squeeze(0)
        image_embeddings_batch = image_embeddings_batch.squeeze(0)

        # 构建三元组
        # set triplet group
        anchors, positives, negatives = get_triplets(text_embedding, image_embeddings_batch, all_image_embeddings)

        # 模型输出
        # output of model
        anchor_outputs = model(anchors)
        positive_outputs = model(positives)
        negative_outputs = torch.stack([model(neg) for neg in negatives])

        # 计算损失
        # caculate the loss
        loss = criterion(anchor_outputs, positive_outputs, negative_outputs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # 调整学习率
    # ajust the learning rate
    scheduler.step(total_loss / len(dataloader))
    step_lr_scheduler.step()

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}")





Epoch 1, Loss: 0.9563
Epoch 2, Loss: 0.9429
Epoch 3, Loss: 0.9285
Epoch 4, Loss: 0.9143
Epoch 5, Loss: 0.8970
Epoch 6, Loss: 0.8800
Epoch 7, Loss: 0.8593
Epoch 8, Loss: 0.8370
Epoch 9, Loss: 0.8117
Epoch 10, Loss: 0.7893
Epoch 11, Loss: 0.7604
Epoch 12, Loss: 0.7316
Epoch 13, Loss: 0.6985
Epoch 14, Loss: 0.6669
Epoch 15, Loss: 0.6301
Epoch 16, Loss: 0.5881
Epoch 17, Loss: 0.5457
Epoch 18, Loss: 0.4987
Epoch 19, Loss: 0.4492
Epoch 20, Loss: 0.4015
Epoch 21, Loss: 0.3527
Epoch 22, Loss: 0.3172
Epoch 23, Loss: 0.2989
Epoch 24, Loss: 0.2633
Epoch 25, Loss: 0.2326
Epoch 26, Loss: 0.2169
Epoch 27, Loss: 0.1900
Epoch 28, Loss: 0.1789
Epoch 29, Loss: 0.1531
Epoch 30, Loss: 0.1499
Epoch 31, Loss: 0.1411
Epoch 32, Loss: 0.1360
Epoch 33, Loss: 0.1366
Epoch 34, Loss: 0.1348
Epoch 35, Loss: 0.1241
Epoch 36, Loss: 0.1297
Epoch 37, Loss: 0.1260
Epoch 38, Loss: 0.1206
Epoch 39, Loss: 0.1232
Epoch 40, Loss: 0.1261
Epoch 41, Loss: 0.1229
Epoch 42, Loss: 0.1172
Epoch 43, Loss: 0.1200
Epoch 44, Loss: 0.11

add early stopping to improve traning process

In [90]:
# ✅ Early Stopping with patience=10
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0

    def step(self, current_loss):
        if self.best_loss is None or current_loss < self.best_loss - self.min_delta:
            self.best_loss = current_loss
            self.counter = 0
        else:
            self.counter += 1

        return self.counter >= self.patience

#early_stopping = EarlyStopping(patience=10, min_delta=0.0001)


In [96]:
# ✅ Early Stopping with patience=8 which works better
class EarlyStopping:
    def __init__(self, patience=8, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0

    def step(self, current_loss):
        if self.best_loss is None or current_loss < self.best_loss - self.min_delta:
            self.best_loss = current_loss
            self.counter = 0
        else:
            self.counter += 1

        return self.counter >= self.patience
early_stopping = EarlyStopping(patience=8, min_delta=0.0001)




In [97]:
# 训练循环
# taining loop
num_epochs = 50
for epoch in range(num_epochs):
    total_loss = 0
    for text_embedding, image_embeddings_batch in dataloader:
        # 移除 batch 维度
        # remove batch dim
        text_embedding = text_embedding.squeeze(0)
        image_embeddings_batch = image_embeddings_batch.squeeze(0)

        # 构建三元组
        # set up triplet group
        anchors, positives, negatives = get_triplets(text_embedding, image_embeddings_batch, all_image_embeddings)

        # 模型输出
        # output of model
        anchor_outputs = model(anchors)
        positive_outputs = model(positives)
        negative_outputs = torch.stack([model(neg) for neg in negatives])

        # 计算损失
        # caculate the loss
        loss = criterion(anchor_outputs, positive_outputs, negative_outputs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # 调整学习率，传入损失值
    # adjust learning rate
    scheduler.step(total_loss / len(dataloader))

    # Early stopping 检查
    # check early stopping
    if early_stopping.step(total_loss / len(dataloader)):
        print(f"Early stopping at epoch {epoch + 1}")
        break

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}")


Epoch 1, Loss: 0.0258
Epoch 2, Loss: 0.0238
Epoch 3, Loss: 0.0265
Epoch 4, Loss: 0.0226
Epoch 5, Loss: 0.0247
Epoch 6, Loss: 0.0215
Epoch 7, Loss: 0.0229
Epoch 8, Loss: 0.0204
Epoch 9, Loss: 0.0277
Epoch 10, Loss: 0.0246
Epoch 11, Loss: 0.0210
Epoch 12, Loss: 0.0241
Epoch 13, Loss: 0.0206
Epoch 14, Loss: 0.0244
Epoch 15, Loss: 0.0219
Epoch 16, Loss: 0.0183
Epoch 17, Loss: 0.0242
Epoch 18, Loss: 0.0204
Epoch 19, Loss: 0.0194
Epoch 20, Loss: 0.0193
Epoch 21, Loss: 0.0215
Epoch 22, Loss: 0.0243
Epoch 23, Loss: 0.0222
Early stopping at epoch 24


In [105]:
# 保存模型的权重
# save model weights

torch.save(model.state_dict(), '/content/gdrive/MyDrive/siamese_model_weights.pth')
print("Model saves to 'siamese_model_weights.pth'")


Model saves to 'siamese_model_weights.pth'


In [117]:
import torch

# 加载模型权重
# load model weights

model_weights = torch.load('/content/gdrive/MyDrive/siamese_model_weights.pth')

# 查看 state_dict 的 keys
# check keys of state_dict
print(model_weights.keys())

# 检查某些层的权重形状
# check shape

for key, value in model_weights.items():
    print(f"{key}: {value.shape}")


odict_keys(['fc1.weight', 'fc1.bias', 'fc2.weight', 'fc2.bias', 'fc3.weight', 'fc3.bias', 'output_layer.weight', 'output_layer.bias'])
fc1.weight: torch.Size([256, 512])
fc1.bias: torch.Size([256])
fc2.weight: torch.Size([128, 256])
fc2.bias: torch.Size([128])
fc3.weight: torch.Size([64, 128])
fc3.bias: torch.Size([64])
output_layer.weight: torch.Size([32, 64])
output_layer.bias: torch.Size([32])


  model_weights = torch.load('/content/gdrive/MyDrive/siamese_model_weights.pth')


In [124]:

import csv


# 定义孪生网络模型
# defien model
class SiameseNetwork(nn.Module):
    def __init__(self, embedding_dim=512):
        super(SiameseNetwork, self).__init__()
        self.fc1 = nn.Linear(embedding_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.output_layer = nn.Linear(64, 32)  # 最后一层输出 32 维度的向量

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.output_layer(x)
        return x

# 加载权重
# load weights
model = SiameseNetwork(embedding_dim=512)
model.load_state_dict(torch.load('/content/gdrive/MyDrive/siamese_model_weights.pth'))
model.eval()

# 加载 CLIP 嵌入文件
#load CLIP embedding file

clip_embeddings = torch.load('/content/gdrive/MyDrive/clip_dev_embeddings.pt')


# 计算文本和图像的相似度，并生成排序
#caculate similarity and generate ranking
results = []
for sample in clip_embeddings:
    compound_name = sample['compound_name']
    text_embedding = sample['text_embedding']
    image_embeddings = sample['images']

    text_feature = model(text_embedding)

    # 计算每个图片的相似度
    #caculate similarity of each image
    image_scores = []
    for image in image_embeddings:
        image_id = image['image_id']
        image_feature = model(image['image_embedding'])
        similarity = F.cosine_similarity(text_feature, image_feature).item()
        image_scores.append((image_id, similarity))

    # 按相似度排序
    # rank according to similarity
    image_scores.sort(key=lambda x: x[1], reverse=True)

    # 仅保留排序后的图片 ID
    #only save the iameg ID
    ranked_images = [image_id for image_id, _ in image_scores]

    #save result
    results.append([compound_name] + ranked_images)

# 保存为 TSV 文件
# save to TSV
with open('/content/submission.tsv', 'w', newline='') as tsvfile:
    writer = csv.writer(tsvfile, delimiter='\t')
    writer.writerow(['compound', 'expected_order'])
    writer.writerows(results)

print("TSV file saved：/content/submission.tsv")

TSV 文件已生成：/content/submission.tsv


  model.load_state_dict(torch.load('/content/gdrive/MyDrive/siamese_model_weights.pth'))
  clip_embeddings = torch.load('/content/gdrive/MyDrive/clip_dev_embeddings.pt')
