In [1]:
import os
from typing import Dict, Union, Any, Optional, List, Tuple
from peft import LoraConfig, get_peft_model
import pandas as pd
import torch.nn.functional as F
import torch
from torch import nn
from transformers import CLIPProcessor, CLIPModel, Trainer, TrainingArguments
from transformers import AltCLIPModel, AltCLIPProcessor
import optuna

In [2]:
import torch
# 设置随机种子
torch.manual_seed(44)  # 固定随机种子

<torch._C.Generator at 0x7d590df83210>

In [3]:
train_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
# kagglefolder = '/kaggle/input/semeaval-2025-task1'
dev_or_test = "dev"
taskA_train_file = "subtask_a_train.tsv"
taskA_test_file = f"subtask_a_{dev_or_test}.tsv"



# 加载英语数据
language = "EN"
taskA_train_folder_en = os.path.join(f'data/TaskA/{language}/train')
# train_df_en = pd.read_csv(os.path.join(taskA_train_folder_en, taskA_train_file), sep="\t")
train_df_en = pd.read_csv('data_augmentation/subtask_a_train_ag.tsv', sep="\t")
# 加载葡萄牙语数据
language = "PT"
taskA_train_folder_pt = os.path.join(f'data/TaskA/{language}/train')
# train_df_pt = pd.read_csv(os.path.join(taskA_train_folder_pt, taskA_train_file), sep="\t")

train_df_pt = pd.read_csv('data_augmentation/subtask_a_train_PT_ag.tsv', sep="\t")
# 定义函数：构建图像路径
def build_image_paths(row, compounds, folder):
    compound = compounds[row.name]
    return [os.path.join(folder, compound.replace("'s", "_s"), img) for img in row]


In [4]:
# 构建英语图像路径
train_image_paths_en = train_df_en[train_image_columns].apply(
    lambda row: build_image_paths(row, train_df_en["compound"].tolist(), taskA_train_folder_en), axis=1
).tolist()

# 构建葡萄牙语图像路径
train_image_paths_pt = train_df_pt[train_image_columns].apply(
    lambda row: build_image_paths(row, train_df_pt["compound"].tolist(), taskA_train_folder_pt), axis=1
).tolist()

# 合并图像路径
train_image_paths = train_image_paths_en + train_image_paths_pt

In [5]:
# 合并数据集
train_df = pd.concat([train_df_en, train_df_pt], ignore_index=True)

In [6]:
# 加载数据
train_texts = train_df["sentence"].tolist()  # 文本
train_compounds = train_df["compound"].tolist()  # 名词性复合词
train_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
train_image_names = train_df[train_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
train_expected_orders = train_df["expected_order"].apply(lambda x: eval(x)).tolist()  # 期望的排序

In [7]:
# 自定义数据集
from torch.utils.data import Dataset
from PIL import Image

class TaskADataset(Dataset):
    def __init__(self, texts, image_paths, expected_orders=None, image_names=None, clip_processor=None, is_test=False):
        """
        texts: 文本列表
        image_paths: 图像路径列表
        expected_orders: 期望的排序列表（仅用于训练集和验证集）
        image_names: 图像名称列表
        clip_processor: CLIP 的 processor
        is_test: 是否是测试集
        """
        self.texts = texts
        self.image_paths = image_paths
        self.expected_orders = expected_orders
        self.image_names = image_names
        self.clip_processor = clip_processor
        self.is_test = is_test  # 是否是测试集

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        image_paths = self.image_paths[idx]

        # 加载图像
        images = [Image.open(img_path) for img_path in image_paths]

        # 使用 CLIP 的 processor 处理文本和图像
        inputs = self.clip_processor(text=text, images=images, return_tensors="pt", padding="max_length")

        if self.is_test:
            # 如果是测试集，返回输入数据和图像名称
            image_names = self.image_names[idx]
            return inputs, image_names  # 使用英文逗号
        else:
            # 如果是训练集或验证集，返回输入数据、期望的排序和图像名称
            expected_order = self.expected_orders[idx]
            image_names = self.image_names[idx]
            return inputs, expected_order, image_names

In [8]:
clip_processor = AltCLIPProcessor.from_pretrained("BAAI/AltCLIP-m18")

preprocessor_config.json:   0%|          | 0.00/559 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/517 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

In [9]:
from torch.utils.data import random_split


# 构建完整的训练数据集
full_train_dataset = TaskADataset(train_texts, train_image_paths, train_expected_orders, train_image_names, clip_processor)

# 定义训练集和验证集的比例
train_size = int(0.85 * len(full_train_dataset))  # 80% 训练集
val_size = len(full_train_dataset) - train_size  # 20% 验证集

# 使用 random_split 划分数据集
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# 打印划分后的数据集大小
print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")

训练集大小: 328
验证集大小: 59


In [10]:
def listmle_loss(scores, expected_order, image_names):
    """
    scores: 模型输出的文本与图像的相似度分数，形状为 (batch_size, num_images)
    expected_order: 期望的图像排序，形状为 (batch_size, num_images)，每个元素是图像名称
    image_names: 图像名称列表，形状为 (batch_size, num_images)
    """
    batch_size, num_images = scores.shape
    loss = 0.0

    for i in range(batch_size):
        order = expected_order[i]
        image_names_i = image_names[i]

        # 确保expected_order中的名称在image_names_i中存在
        try:
            image_name_to_idx = {img_name: idx for idx, img_name in enumerate(image_names_i)}
            sorted_idx = [image_name_to_idx[img_name] for img_name in order]
        except KeyError as e:
            raise ValueError(f"图像名称 {e} 不在当前样本的image_names中")

        sorted_scores = scores[i, sorted_idx]  # 按正确顺序排列的分数

        # 计算每一步的logsumexp（剩余项）
        loss_i = 0.0
        for k in range(num_images - 1):  # 最后一项无需计算
            remaining_scores = sorted_scores[k:]
            logsum = torch.logsumexp(remaining_scores, dim=0)
            loss_i += (logsum - sorted_scores[k])

        loss += loss_i

    return loss / batch_size  # 平均batch损失

In [14]:
# 自定义 DataCollator
class TaskADataCollator:
    def __call__(self, features):
        # 提取输入数据
        inputs = {
            "input_ids": torch.stack([f[0]["input_ids"].squeeze(0) for f in features]),
            "attention_mask": torch.stack([f[0]["attention_mask"].squeeze(0) for f in features]),
            "pixel_values": torch.stack([f[0]["pixel_values"].squeeze(0) for f in features]).view(-1, 3, 224, 224),
        }

        # 提取排序标签
        expected_order = [f[1] for f in features]
        image_names = [f[2] for f in features]

        # 返回 inputs 和额外的标签
        return inputs, expected_order, image_names

In [16]:
from transformers import EarlyStoppingCallback


class TaskATrainer(Trainer):
    def __init__(self, loss_fn=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_fn = loss_fn  # 将损失函数保存为实例变量

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # 提取输入数据和排序标签
        inputs, expected_order, image_names = inputs

        # 假设 inputs 是从 DataCollator 返回的输入
        input_ids = inputs["input_ids"]  # 形状为 (batch_size, 77)
        attention_mask = inputs["attention_mask"]  # 形状为 (batch_size, 77)
        pixel_values = inputs["pixel_values"]  # 形状为 (batch_size * 5, 3, 224, 224)
        
        # 将图像数据按文本分组，得到 (batch_size, 5, 3, 224, 224)
        batch_size = input_ids.shape[0]
        pixel_values = pixel_values.view(batch_size, 5, 3, 224, 224)
        
        # 初始化 logits_per_text
        logits_per_text = []
        
        # 分批次计算 logits_per_text
        for i in range(batch_size):
            # 提取第 i 个文本和对应的 5 张图像
            text_input = {
                "input_ids": input_ids[i].unsqueeze(0),  # 形状为 (1, 77)
                "attention_mask": attention_mask[i].unsqueeze(0)  # 形状为 (1, 77)
            }
            image_input = {
                "pixel_values": pixel_values[i]  # 形状为 (5, 3, 224, 224)
            }
        
            # 模型前向传播
            outputs = model(**text_input, **image_input)
            logits = outputs['logits_per_text']  # 形状为 (1, 5)
        
            # 添加到 logits_per_text
            logits_per_text.append(logits)
        
        # 合并 logits_per_text，得到 (batch_size, 5)
        logits_per_text = torch.cat(logits_per_text, dim=0)

        # 使用传入的损失函数计算损失
        loss = self.loss_fn(logits_per_text, expected_order, image_names)

        return (loss, outputs) if return_outputs else loss

    def prediction_step(self, model, inputs, prediction_loss_only=False, ignore_keys=None):
        # 确保 inputs 是一个字典
        if isinstance(inputs, (list, tuple)):
            inputs, expected_order, image_names = inputs
        else:
            expected_order = None
            image_names = None
    
        # 添加 return_loss 字段
        inputs["return_loss"] = True
    
        # 提取文本和图像输入
        input_ids = inputs["input_ids"]  # 形状为 (batch_size, 77)
        attention_mask = inputs["attention_mask"]  # 形状为 (batch_size, 77)
        pixel_values = inputs["pixel_values"]  # 形状为 (batch_size * num_images_per_text, 3, 224, 224)
        
        # 将图像数据按文本分组，得到 (batch_size, num_images_per_text, 3, 224, 224)
        batch_size = input_ids.shape[0]
        num_images_per_text = 5
        pixel_values = pixel_values.view(batch_size, num_images_per_text, 3, 224, 224)
    
        # 初始化 logits_per_text
        logits_per_text = []
    
        # 分批次计算 logits_per_text
        for i in range(batch_size):
            # 提取第 i 个文本和对应的 5 张图像
            text_input = {
                "input_ids": input_ids[i].unsqueeze(0),  # 形状为 (1, 77)
                "attention_mask": attention_mask[i].unsqueeze(0)  # 形状为 (1, 77)
            }
            image_input = {
                "pixel_values": pixel_values[i]  # 形状为 (5, 3, 224, 224)
            }
    
            # 模型前向传播
            with torch.no_grad():  # 禁用梯度计算
                outputs = model(**text_input, **image_input)
            logits = outputs['logits_per_text']  # 形状为 (1, 5)
    
            # 添加到 logits_per_text
            logits_per_text.append(logits)
    
        # 合并 logits_per_text，得到 (batch_size, 5)
        logits_per_text = torch.cat(logits_per_text, dim=0)
    
        # 使用传入的损失函数计算 loss
        loss = self.loss_fn(logits_per_text, expected_order, image_names)
    
        # 如果只需要损失值，直接返回
        if prediction_loss_only:
            return (loss, None, None)
    
        # 将 expected_order 和 image_names 转换为张量或嵌套的字典
        # 例如，将 image_names 转换为索引列表
        image_indices = [
            [image_names[i].index(img) for img in expected_order[i]]  # 真实索引
            for i in range(len(image_names))
        ]
        image_indices = torch.tensor(image_indices, dtype=torch.long)  # 转换为张量
    
        # 返回 loss、模型输出和额外信息
        return (loss, logits_per_text, {"image_indices": image_indices})

In [17]:
clip_model = AltCLIPModel.from_pretrained("BAAI/AltCLIP-m18")

config.json:   0%|          | 0.00/4.99k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/4.78G [00:00<?, ?B/s]

Some weights of the model checkpoint at BAAI/AltCLIP-m18 were not used when initializing AltCLIPModel: ['text_model.transformation_pre.bias', 'text_model.transformation_pre.weight']
- This IS expected if you are initializing AltCLIPModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing AltCLIPModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [18]:
clip_model.config

AltCLIPConfig {
  "_attn_implementation_autoset": true,
  "_name_or_path": "BAAI/AltCLIP-m18",
  "architectures": [
    "AltCLIPModel"
  ],
  "direct_kd": false,
  "initializer_factor": 1.0,
  "logit_scale_init_value": 2.6592,
  "model_type": "altclip",
  "num_layers": 3,
  "projection_dim": 1024,
  "text_config": {
    "_attn_implementation_autoset": true,
    "classifier_dropout": null,
    "model_type": "altclip_text_model",
    "pooler_fn": "cls",
    "project_dim": 1024
  },
  "text_model_name": null,
  "torch_dtype": "float32",
  "transformers_version": "4.47.0",
  "vision_config": {
    "dropout": 0.0,
    "hidden_size": 1280,
    "intermediate_size": 5120,
    "model_type": "altclip_vision_model",
    "num_attention_heads": 16,
    "num_hidden_layers": 32,
    "patch_size": 14
  },
  "vision_model_name": null
}

In [19]:
from scipy.stats import spearmanr

def compute_metrics(eval_pred):
    # 解包 eval_pred
    logits_per_text, extra_info = eval_pred.predictions, eval_pred.label_ids

    # 提取 image_indices
    image_indices = extra_info["image_indices"]#真实索引

    # 将 logits_per_text 转换为 torch.Tensor
    logits_per_text = torch.tensor(logits_per_text)

    # 计算 Top Image Accuracy
    top_image_accuracy = 0.0
    for i in range(len(logits_per_text)):
        predicted_order = torch.argsort(logits_per_text[i], descending=True).tolist()
        if predicted_order[0] == image_indices[i][0]:  # 检查预测的 top-1 是否正确
            top_image_accuracy += 1
    top_image_accuracy /= len(logits_per_text)

    # 计算 Spearman 等级相关系数
    spearman_corr = 0.0
    for i in range(len(logits_per_text)):
        predicted_order = torch.argsort(logits_per_text[i], descending=True).tolist()
        true_order = image_indices[i].tolist()
        corr, _ = spearmanr(predicted_order, true_order)
        spearman_corr += corr
    spearman_corr /= len(logits_per_text)

    # 计算综合指标（加权平均）
    combined_metric = 0.7 * top_image_accuracy + 0.3 * spearman_corr
    # 返回指标
    return {
        "top_image_accuracy": top_image_accuracy,
        "spearman_corr": spearman_corr,
        "combined_metric": combined_metric  # 综合指标
    }

In [23]:
# import optuna
# from transformers import TrainingArguments
# from peft import LoraConfig, get_peft_model

# def objective(trial):
#     # 获取超参数
#     learning_rate = trial.suggest_categorical('learning_rate', [1e-3,4e-4,2e-4,8e-4])  # 学习率范围

#     # LoRA 超参数
#     lora_r = trial.suggest_categorical('lora_r', [8, 16, 32])  # LoRA秩
#     alpha_multiplier = trial.suggest_categorical('alpha_multiplier', [1, 2,4,8])  # 选择一个乘数
#     lora_alpha = lora_r * alpha_multiplier  # 计算 lora_alpha
#     lora_dropout = trial.suggest_categorical('lora_dropout', [0.2, 0.3, 0.4])
#     # LoRA dropout概率

#     # 调整梯度累积步数
#     gradient_accumulation_steps = trial.suggest_categorical('gradient_accumulation_steps', [4,8])  # 梯度累积步数

#     print(f"Learning Rate: {learning_rate}")
#     print(f"LoRA Rank (r): {lora_r}")
#     print(f"Alpha Multiplier: {alpha_multiplier}")
#     print(f"LoRA Alpha: {lora_alpha}")
#     print(f"LoRA Dropout: {lora_dropout}")
#     print(f"Gradient Accumulation Steps: {gradient_accumulation_steps}")
    

#     # 创建 LoRA 配置
#     lora_config = LoraConfig(
#         r=lora_r,  # LoRA秩
#         lora_alpha=lora_alpha,  # LoRA缩放因子
#         target_modules=["q_proj", "v_proj"],  # 选择目标模块
#         lora_dropout=lora_dropout,  # LoRA dropout
#         bias="none"  # 不训练偏置项
#     )

#     # 加载基础 CLIP 模型并应用 LoRA 配置
#     clip_model = AltCLIPModel.from_pretrained("BAAI/AltCLIP-m18")
#     clip_model = get_peft_model(clip_model, lora_config)

#     # 设置训练参数，保持与 TrainingArguments 配置一致
#     training_args = TrainingArguments(
#         output_dir="./results",          # 输出目录
#         run_name="my_experiment_1",      # 任务名称
#         learning_rate=learning_rate,    # 学习率
#         per_device_train_batch_size=2,  # 固定每个设备的训练批次大小
#         per_device_eval_batch_size=16,   # 固定每个设备的评估批次大小
#         num_train_epochs=1,            # 固定训练轮数
#         save_strategy="steps",          # 每多少步保存模型
#         save_steps=2,                   # 每 2 步保存一次
#         logging_dir="./logs",           # 日志目录
#         logging_steps=2,                # 每 2 步记录一次日志
#         eval_strategy="steps",          # 每多少步评估模型
#         eval_steps=2,                   # 每 2 步评估一次
#         save_total_limit=2,             # 最多保存 2 个模型检查点
#         load_best_model_at_end=True,    # 训练结束时加载最佳模型
#         metric_for_best_model="top_image_accuracy",  # 监控验证集损失
#         greater_is_better=True,        # 损失越小越好
#         gradient_accumulation_steps=gradient_accumulation_steps,  # 调整梯度累积步数
#         report_to="none",               # 不上传到其他平台
#         save_safetensors=False,         # 禁用 safetensors
#     )
#     # 初始化 DataCollator
#     data_collator = TaskADataCollator()
#     # 初始化 Trainer
#     trainer = TaskATrainer(
#         model=clip_model,  # 使用 LoRA 模型
#         args=training_args,
#         train_dataset=train_dataset,
#         eval_dataset=val_dataset,
#         data_collator=data_collator,
#         compute_metrics=compute_metrics,
#         callbacks=[EarlyStoppingCallback(early_stopping_patience=6)],
#         loss_fn=lambdarank_loss
#     )

#     # 开始训练
#     train_result= trainer.train()
#     print(train_result)
#     train_loss=train_result.metrics['train_loss']

#     # 计算验证集损失
#     eval_result = trainer.evaluate()
#     print(eval_result)
#     val_loss = eval_result["eval_loss"]

#     # 返回损失值，Optuna 会通过最小化损失来选择最佳的超参数
#     return val_loss*0.3+0.7*train_loss


In [24]:
# # 创建一个 Study 对象，设置为最小化目标（损失）
# study = optuna.create_study(direction='minimize')

# # 执行超参数搜索
# study.optimize(objective, n_trials=20)  # 进行 20 次实验

# # 输出最佳超参数
# print(f"Best hyperparameters: {study.best_params}")

In [25]:
# # 获取最佳超参数
# best_params = study.best_params

# # 获取最优的超参数
# learning_rate = best_params['learning_rate']
# lora_r = best_params['lora_r']
# alpha_multiplier=best_params['alpha_multiplier']
# lora_alpha = lora_r*alpha_multiplier
# lora_dropout = best_params['lora_dropout']
# gradient_accumulation_steps = best_params['gradient_accumulation_steps']

# # 创建 LoRA 配置
# lora_config = LoraConfig(
#     r=lora_r,  # LoRA秩
#     lora_alpha=lora_alpha,  # LoRA缩放因子
#     target_modules=["q_proj", "v_proj"],  # 选择目标模块
#     lora_dropout=lora_dropout,  # LoRA dropout
#     bias="none"  # 不训练偏置项
# )

# # 加载基础 CLIP 模型并应用 LoRA 配置
# clip_model = AltCLIPModel.from_pretrained("BAAI/AltCLIP-m18")
# clip_model = get_peft_model(clip_model, lora_config)

# # 设置训练参数
# training_args = TrainingArguments(
#     output_dir="./results",          # 输出目录
#     run_name="my_experiment_1",      # 任务名称
#     learning_rate=learning_rate,    # 学习率
#     per_device_train_batch_size=2,  # 固定每个设备的训练批次大小
#     per_device_eval_batch_size=16,   # 固定每个设备的评估批次大小
#     num_train_epochs=5,             # 固定训练轮数
#     save_strategy="steps",          # 每多少步保存模型
#     save_steps=2,                   # 每 2 步保存一次
#     logging_dir="./logs",           # 日志目录
#     logging_steps=2,                # 每 2 步记录一次日志
#     eval_strategy="steps",          # 每多少步评估模型
#     eval_steps=2,                   # 每 2 步评估一次
#     save_total_limit=2,             # 最多保存 2 个模型检查点
#     load_best_model_at_end=True,    # 训练结束时加载最佳模型
#     metric_for_best_model="top_image_accuracy",  # 监控验证集损失
#     greater_is_better=True,        # 损失越小越好
#     gradient_accumulation_steps=gradient_accumulation_steps,  # 调整梯度累积步数
#     report_to="none",               # 不上传到其他平台
#     save_safetensors=False,         # 禁用 safetensors
# )

# # 初始化 DataCollator
# data_collator = TaskADataCollator()

# # 初始化 Trainer
# trainer = TaskATrainer(
#     model=clip_model,  # 使用 LoRA 模型
#     args=training_args,
#     train_dataset=train_dataset,
#     eval_dataset=val_dataset,
#     data_collator=data_collator,
#     compute_metrics=compute_metrics,
#     callbacks=[EarlyStoppingCallback(early_stopping_patience=6)],
#     loss_fn=lambdarank_loss
# )

# # 开始训练
# trainer.train()

In [26]:
def listmle_loss_combine_focal_loss(
    scores, 
    expected_order, 
    image_names, 
    alpha=0.85,
    gamma=2, 
    alpha_fl=[0.35,0.1,0.15,0.3,0.1]
):
    """
    结合ListMLE和Focal Loss的损失函数，优化Top1、排名第三的样本以及干扰项（top5）的排序
    :param scores: 模型的输出分数，形状为 (batch_size, num_images)
    :param expected_order: 每个样本的正确顺序，形状为 (batch_size, num_images)
    :param image_names: 每个样本的图像名称列表，形状为 (batch_size, num_images)
    :param alpha: 控制ListMLE和Focal Loss的权重比例
    :param gamma: Focal Loss的调节参数，用于降低易分类样本的权重
    :param alpha_fl: Focal Loss中正样本的权重，用于平衡正负样本
    :return: 组合后的损失值
    """
    # 计算原始ListMLE损失（优化NDCG）
    listmle = listmle_loss(scores, expected_order, image_names)
    
    # 计算Focal Loss（优化Top1、排名第三的样本以及干扰项top5）
    batch_size, num_images = scores.shape
    focal_loss = 0.0
    
    for i in range(batch_size):
        if alpha == 0:
            break
        # 获取当前样本的正确顺序
        correct_order = expected_order[i]
        # 获取Top1、排名第三和干扰项（top5）的索引
        top1_image = correct_order[0]
        top2_image = correct_order[1]
        top3_image = correct_order[2]
        top4_image = correct_order[3]
        top5_image = correct_order[4]  # 干扰项（top5）
        top1_idx = image_names[i].index(top1_image)
        top2_idx = image_names[i].index(top2_image)
        top3_idx = image_names[i].index(top3_image)
        top4_idx = image_names[i].index(top4_image)
        top5_idx = image_names[i].index(top5_image)
        
        # 使用sigmoid获取每个位置的独立概率
        probs = torch.sigmoid(scores[i])  # 形状: (num_images,)
        
        # 计算Top1样本的Focal Loss（正样本）
        pt_top1 = probs[top1_idx]
        focal_term_top1 = - alpha_fl[0] * ((1 - pt_top1) ** gamma) * torch.log(pt_top1 + 1e-10)

        pt_top2 = probs[top2_idx]
        focal_term_top2 = - alpha_fl[1] * (pt_top2 ** gamma) * torch.log(1 - pt_top2 + 1e-10)

        pt_top3 = probs[top3_idx]
        focal_term_top3 = - alpha_fl[2] * (pt_top3 ** gamma) * torch.log(1 - pt_top3 + 1e-10)
        
        # 计算排名第四样本的Focal Loss（负样本）
        pt_top4 = probs[top4_idx]
        focal_term_top4 = - alpha_fl[3] * (pt_top4 ** gamma) * torch.log(1 - pt_top4 + 1e-10)
        
        # 计算干扰项（top5）的Focal Loss（确保其概率低于其他图片）
        pt_top5 = probs[top5_idx]
        focal_term_top5 = - alpha_fl[4] * (pt_top5 ** gamma) * torch.log(1 - pt_top5 + 1e-10)
        
        # 总Focal Loss
        sample_focal = focal_term_top1 + focal_term_top2 + focal_term_top3+focal_term_top4 + focal_term_top5
        focal_loss += sample_focal
    
    # 归一化并加权组合损失
    focal_loss = focal_loss / batch_size
    total_loss = (1 - alpha) * listmle + alpha * focal_loss
    
    return total_loss

In [27]:
# 获取最优的超参数
learning_rate = 1e-4
lora_r = 6
alpha_multiplier=8
lora_alpha = lora_r*alpha_multiplier
lora_dropout = 0.5
gradient_accumulation_steps = 8

# 创建 LoRA 配置
lora_config = LoraConfig(
    r=lora_r,  # LoRA秩
    lora_alpha=lora_alpha,  # LoRA缩放因子
    target_modules=["q_proj", "v_proj"],  # 选择目标模块
    lora_dropout=lora_dropout,  # LoRA dropout
    bias="none"  # 不训练偏置项
)

# 加载基础 CLIP 模型并应用 LoRA 配置
clip_model = AltCLIPModel.from_pretrained("BAAI/AltCLIP-m18")
clip_model = get_peft_model(clip_model, lora_config)

# 设置训练参数
training_args = TrainingArguments(
    output_dir="./results",          # 输出目录
    run_name="my_experiment_1",      # 任务名称
    learning_rate=learning_rate,    # 学习率
    per_device_train_batch_size=2,  # 固定每个设备的训练批次大小
    per_device_eval_batch_size=59,  # 固定每个设备的评估批次大小
    num_train_epochs=2,             # 固定训练轮数
    save_strategy="steps",          # 每多少步保存模型
    save_steps=2,                   # 每 2 步保存一次
    logging_dir="./logs",           # 日志目录
    logging_steps=2,                # 每 2 步记录一次日志
    eval_strategy="steps",          # 每多少步评估模型
    eval_steps=2,                   # 每 2 步评估一次
    save_total_limit=2,             # 最多保存 2 个模型检查点
    load_best_model_at_end=True,    # 训练结束时加载最佳模型
    metric_for_best_model="eval_loss",  # 监控验证集损失
    greater_is_better=False,        # 损失越小越好
    gradient_accumulation_steps=gradient_accumulation_steps,  # 调整梯度累积步数
    report_to="none",               # 不上传到其他平台
    save_safetensors=False,         # 禁用 safetensors
    # lr_scheduler_type="cosine" ,  # 带 Warmup 的余弦调度器
    # warmup_steps=20,               # 设置 warm-up 步数
    # max_grad_norm=1.0,              # 设置梯度裁剪的最大值
)


# 初始化 DataCollator
data_collator = TaskADataCollator()

# 初始化 Trainer
trainer = TaskATrainer(
    model=clip_model,  # 使用 LoRA 模型
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],
    loss_fn=listmle_loss_combine_focal_loss
)

# 开始训练
trainer.train()

Some weights of the model checkpoint at BAAI/AltCLIP-m18 were not used when initializing AltCLIPModel: ['text_model.transformation_pre.bias', 'text_model.transformation_pre.weight']
- This IS expected if you are initializing AltCLIPModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing AltCLIPModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Could not estimate the number of tokens of the input, floating-point operations will not be computed


Step,Training Loss,Validation Loss,Top Image Accuracy,Spearman Corr,Combined Metric
2,9.9061,11.197869,0.423729,0.194915,0.355085
4,8.6794,10.9235,0.423729,0.176271,0.349492
6,9.7868,10.56129,0.423729,0.191525,0.354068
8,7.8207,10.115294,0.423729,0.20339,0.357627
10,8.4899,9.710594,0.457627,0.208475,0.382881
12,7.2043,9.231367,0.423729,0.186441,0.352542
14,6.7477,8.891164,0.423729,0.171186,0.347966
16,5.838,8.572636,0.423729,0.177966,0.35
18,5.5768,8.262285,0.440678,0.188136,0.364915
20,5.7598,7.956707,0.457627,0.218644,0.385932


TrainOutput(global_step=40, training_loss=5.939890933036804, metrics={'train_runtime': 1218.4381, 'train_samples_per_second': 0.538, 'train_steps_per_second': 0.033, 'total_flos': 0.0, 'train_loss': 5.939890933036804, 'epoch': 1.9268292682926829})

In [28]:
trainer.save_model("./results/f_model")

In [29]:
clip_model = AltCLIPModel.from_pretrained("./results/f_model")

Some weights of the model checkpoint at BAAI/AltCLIP-m18 were not used when initializing AltCLIPModel: ['text_model.transformation_pre.bias', 'text_model.transformation_pre.weight']
- This IS expected if you are initializing AltCLIPModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing AltCLIPModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [30]:
dev_or_test = "test"
taskA_test_file = f"subtask_a_{dev_or_test}.tsv"
language = "EN"
taskA_test_folder = os.path.join(f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder, taskA_test_file), sep="\t")
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")


test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()

test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)
##%%
test_dataset
##%%
import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
##%%
# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_{language}_{dev_or_test}.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_{language}.tsv")

Using device: cuda
推理完成，结果已保存到 submission_EN.tsv


In [31]:
dev_or_test = "test"
taskA_test_file = f"subtask_a_{dev_or_test}.tsv"
language = "PT"
taskA_test_folder = os.path.join( f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder, taskA_test_file), sep="\t")
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")


test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()

test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)
##%%
test_dataset
##%%
import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
##%%
# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_{language}_{dev_or_test}.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_{language}_{dev_or_test}.tsv")

Using device: cuda
推理完成，结果已保存到 submission_PT_test.tsv


In [32]:
##%%
dev_or_test = "dev"
taskA_test_file = f"subtask_a_{dev_or_test}.tsv"
# 加载英语数据
language = "EN"
taskA_test_folder = os.path.join( f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder, taskA_test_file), sep="\t")
##%%
# 读取测试数据
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")

# 提取测试数据
test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()
##%%
# 构建测试数据集实例
test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)
##%%
test_dataset
##%%
import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
##%%
# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_{language}_{dev_or_test}.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_{language}_{dev_or_test}.tsv")

Using device: cuda
推理完成，结果已保存到 submission_EN_dev.tsv


In [33]:
dev_or_test = "dev"
taskA_test_file = f"subtask_a_{dev_or_test}.tsv"
language = "PT"
taskA_test_folder = os.path.join( f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder, taskA_test_file), sep="\t")
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")


test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()

test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)
##%%
test_dataset
##%%
import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
##%%
# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_{language}_{dev_or_test}.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_{language}_{dev_or_test}.tsv")

Using device: cuda
推理完成，结果已保存到 submission_PT_dev.tsv


In [34]:
##%%
dev_or_test = "xeval"
taskA_test_file = f"subtask_a_xe.tsv"
# 加载英语数据
language = "EN"
taskA_test_folder = os.path.join(f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")

# 提取测试数据
test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()
##%%
# 构建测试数据集实例
test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)
##%%
test_dataset
##%%
import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
##%%
# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_xe.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_xe.tsv")

Using device: cuda
推理完成，结果已保存到 submission_xe.tsv


In [35]:
dev_or_test = "xeval"
taskA_test_file = f"subtask_a_xp.tsv"
# 加载英语数据
language = "PT"
taskA_test_folder = os.path.join(f'data/TaskA/{language}/{dev_or_test}')
test_df = pd.read_csv(os.path.join(taskA_test_folder, taskA_test_file), sep="\t")
##%%
# 读取测试数据
test_df = pd.read_csv(os.path.join(taskA_test_folder,taskA_test_file), sep="\t")

# 提取测试数据
test_texts = test_df["sentence"].tolist()
test_compounds = test_df["compound"].tolist()
test_image_columns = ["image1_name", "image2_name", "image3_name", "image4_name", "image5_name"]
test_image_names = test_df[test_image_columns].apply(lambda row: row.tolist(), axis=1).tolist()  # 图像名称
test_image_paths = test_df[test_image_columns].apply(lambda row: build_image_paths(row, test_compounds, taskA_test_folder), axis=1).tolist()
##%%
# 构建测试数据集实例
test_dataset = TaskADataset(
    texts=test_texts,
    image_paths=test_image_paths,
    image_names=test_image_names, 
    clip_processor=clip_processor,
    is_test=True  # 是测试集
)

test_dataset

import torch

# 定义推理函数
def predict(model, dataset, device):
    """
    model: 训练好的模型
    dataset: 测试数据集
    device: 设备（CPU 或 GPU）
    """
    model.to(device)  # 将模型移动到指定设备
    model.eval()  # 设置模型为评估模式
    predictions = []

    with torch.no_grad():  # 禁用梯度计算
        for inputs, image_names in dataset:  # 解包 inputs 和 image_names
            # 将输入数据移动到与模型相同的设备
            inputs = {key: value.to(device) for key, value in inputs.items()}

            # 模型前向传播
            outputs = model(**inputs)
            logits_per_text = outputs['logits_per_text']  # 文本与图像的相似度分数，形状为 (1, num_images)

            # 获取排序结果
            sorted_indices = torch.argsort(logits_per_text, dim=1, descending=True).squeeze(0).tolist()
            sorted_image_names = [image_names[idx] for idx in sorted_indices]  # 根据排序结果获取图像名称
            predictions.append(sorted_image_names)

    return predictions

#检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 对测试数据进行推理
predictions = predict(clip_model, test_dataset, device)

# 生成提交结果
results = []
for compound, prediction in zip(test_compounds, predictions):
    results.append({"compound": compound, "expected_order": prediction})

# 将结果保存到文件
results_df = pd.DataFrame(results)
results_df.to_csv(f'submission_xp.tsv', sep="\t", index=False)

print(f"推理完成，结果已保存到 submission_xp.tsv")

Using device: cuda
推理完成，结果已保存到 submission_xp.tsv
