# 利用网格/区域表示+自注意力+注意力的模型结构来完成副使图像描述任务

## 参数配置
`configurations.py` 文件中定义的 `Config` 类作为项目的配置中心，其作用是集中管理项目中使用的所有配置参数。这些参数通常包括文件路径、模型参数、数据处理选项、训练设置和图像处理参数等。通过这种方式，可以在不修改代码的情况下调整项目的行为。

以下是`Config`类中定义的配置参数及其作用：

1. **数据路径**：
   - `data_path`：主数据目录路径。
   - `images_path`：存储图像的路径。
   - `train_captions_path`：训练集的文本描述文件路径。
   - `test_captions_path`：测试集的文本描述文件路径。
   - `output_folder`：用于存储词汇表和处理后数据的输出文件夹路径。

2. **模型参数**：
   - `embed_size`：嵌入向量的维度。
   - `vocab_size`：词汇表的大小。
   - `num_layers`：定义循环神经网络中的层数。
   - `num_heads`：自注意力机制中头的数量。
   - `dropout`：在模型中使用的dropout比率。
   - `hidden_size`：隐藏层的维度。
   - `image_code_dim`：图像编码的维度。
   - `word_dim`：词嵌入的维度。
   - `attention_dim`：注意力机制的隐藏层维度。

3. **数据处理参数**：
   - `min_word_count`：词汇表中词的最小出现次数，用于筛选词汇。
   - `max_len`：假设的描述的最大长度。

4. **训练参数**：
   - `batch_size`：每个批次的大小。
   - `learning_rate`：学习率。
   - `num_epochs`：训练的总轮次数。
   - `workers`：加载数据时使用的工作线程数。
   - `encoder_learning_rate`：编码器的学习率。
   - `decoder_learning_rate`：解码器的学习率。
   - `lr_update`：学习率更新频率。

5. **图像预处理参数**：
   - `image_size`：图像缩放后的大小。
   - `crop_size`：从缩放后的图像中裁剪出的大小。

6. **其他配置**：
   - `device`：设置运行计算的设备，如果CUDA可用则使用GPU，否则使用CPU。

## 数据预处理
为图像描述任务准备和预处理数据，确保数据能够被模型以适当的格式接受和处理。它是建立有效的训练和测试环境的基础。

这个`datasets.py`文件实现了以下几个主要功能：

1. `create_dataset`函数：用于处理原始文本描述，创建一个词汇表，并将文本转换为对应的词索引向量。它首先读取训练和测试数据集中的文本描述，然后统计词频以创建词汇表，并移除低频词。之后，它定义了一个内部函数`encode_captions`，这个函数负责将每条文本描述转换为一个固定长度的词索引序列，包括特殊标记<start>, <end>, <pad>, 和<unk>。转换完成后，函数将这些数据保存为JSON文件，以便后续处理。

In [None]:
import torch
from PIL import Image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from configurations import Config  # 导入配置类


# 从配置文件获取配置
config = Config()


def create_dataset(max_len=64):
    """
    整理数据集，构建词汇表，并将文本描述转换为词索引向量。
    使用configuration.py文件中定义的配置信息。
    """
    # 使用config中定义的路径
    image_folder = config.images_path
    train_captions_path = config.train_captions_path
    test_captions_path = config.test_captions_path
    output_folder = config.output_folder

    # 读取训练图像描述
    with open(train_captions_path, 'r') as f:
        train_captions_data = json.load(f)

    # 读取测试图像描述
    with open(test_captions_path, 'r') as f:
        test_captions_data = json.load(f)

    # 统计训练集的文本描述的词频
    vocab = Counter()
    for caption in train_captions_data.values():
        vocab.update(caption.lower().split())

    # 移除其中的低频词
    vocab = {word for word, count in vocab.items() if count >= config.min_word_count}

    # 构建词典
    word_to_idx = {word: idx + 4 for idx, word in enumerate(vocab)}
    word_to_idx['<pad>'] = 0
    word_to_idx['<start>'] = 1
    word_to_idx['<end>'] = 2
    word_to_idx['<unk>'] = 3

    # 一个函数来转换描述为词索引向量，并进行填充
    def encode_captions(captions_data, word_to_idx, max_len):
        encoded_captions = {}
        caplens = {}
        for img_id, caption in captions_data.items():
            words = caption.lower().split()
            encoded_caption = [word_to_idx.get(word, word_to_idx['<unk>']) for word in words]
            caplen = len(encoded_caption) + 2  # 加2是因为还要加上<start>和<end>
            encoded_caption = [word_to_idx['<start>']] + encoded_caption + [word_to_idx['<end>']]
            encoded_caption += [word_to_idx['<pad>']] * (max_len - len(encoded_caption))
            encoded_captions[img_id] = encoded_caption[:max_len]
            caplens[img_id] = caplen if caplen <= max_len else max_len
        return encoded_captions, caplens
    # def encode_captions(captions_data, word_to_idx, max_len):
    #     encoded_captions = {}
    #     for img_id, caption in captions_data.items():
    #         words = caption.lower().split()
    #         encoded_caption = [word_to_idx.get(word, word_to_idx['<unk>']) for word in words]
    #         encoded_caption = [word_to_idx['<start>']] + encoded_caption + [word_to_idx['<end>']]
    #         encoded_caption += [word_to_idx['<pad>']] * (max_len - len(encoded_caption))
    #         encoded_captions[img_id] = encoded_caption[:max_len]
    #     return encoded_captions

    # 对训练集描述进行编码
    encoded_captions_train, caplens_train = encode_captions(train_captions_data, word_to_idx, max_len)

    # 对测试集描述进行编码
    encoded_captions_test, caplens_test = encode_captions(test_captions_data, word_to_idx, max_len)

    # 存储词典和编码后的描述
    with open(os.path.join(output_folder, 'vocab.json'), 'w') as f:
        json.dump(word_to_idx, f)

    with open(os.path.join(output_folder, 'encoded_captions_train.json'), 'w') as f:
        json.dump(encoded_captions_train, f)

    with open(os.path.join(output_folder, 'encoded_captions_test.json'), 'w') as f:
        json.dump(encoded_captions_test, f)

    # 存储图像路径
    image_paths_train = {img_id: os.path.join(image_folder, img_id) for img_id in train_captions_data.keys()}
    with open(os.path.join(output_folder, 'image_paths_train.json'), 'w') as f:
        json.dump(image_paths_train, f)

    image_paths_test = {img_id: os.path.join(image_folder, img_id) for img_id in test_captions_data.keys()}
    with open(os.path.join(output_folder, 'image_paths_test.json'), 'w') as f:
        json.dump(image_paths_test, f)

    # 存储caplens
    with open(os.path.join(output_folder, 'caplens_train.json'), 'w') as f:
        json.dump(caplens_train, f)

    with open(os.path.join(output_folder, 'caplens_test.json'), 'w') as f:
        json.dump(caplens_test, f)


# 调用函数，整理数据集
# create_dataset()

2. `ImageTextDataset`类：继承自`torch.utils.data.Dataset`，这个类是一个PyTorch的自定义数据集，用于加载图像和对应的已编码文本描述。它重写了`__getitem__`方法，用于获取索引对应的数据点（图像和文本描述），并将图像通过转换处理成统一的格式；重写了`__len__`方法，返回数据集的大小。

In [None]:
class ImageTextDataset(Dataset):
    """
    PyTorch数据集类，用于加载和处理图像-文本数据。
    """

    def __init__(self, image_paths_file, captions_file, caplens_file, transform=None):
        """
        初始化数据集类。
        参数:
            image_paths_file: 包含图像路径的json文件路径。
            captions_file: 包含编码后文本描述的json文件路径。
            transform: 应用于图像的预处理转换。
        """
        # 载入图像路径和文本描述以及caplens
        with open(image_paths_file, 'r') as f:
            self.image_paths = json.load(f)

        with open(captions_file, 'r') as f:
            self.captions = json.load(f)

        with open(caplens_file, 'r') as f:
            self.caplens = json.load(f)

        # 设置图像预处理方法
        self.transform = transform or transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __getitem__(self, index):
        """
        获取单个数据点。
        参数:
            index: 数据点的索引。
        返回:
            一个包含图像和对应文本描述的元组。
        """
        # 获取图像路径和文本描述以及caplen
        image_id = list(self.image_paths.keys())[index]
        image_path = self.image_paths[image_id]
        caption = self.captions[image_id]
        caplen = self.caplens[image_id]

        # 加载图像并应用预处理
        image = Image.open(image_path).convert('RGB')
        if self.transform is not None:
            image = self.transform(image)

        # 将文本描述转换为张量
        caption_tensor = torch.tensor(caption, dtype=torch.long)

        return image, caption_tensor, caplen

    def __len__(self):
        """
        数据集中的数据点总数。
        """
        return len(self.image_paths)


# 创建数据集实例
# train_dataset = ImageTextDataset(
#     image_paths_file=os.path.join(config.output_folder, 'image_paths_train.json'),
#     captions_file=os.path.join(config.output_folder, 'encoded_captions_train.json'),
#     caplens_file=os.path.join(config.output_folder, 'caplens_train.json')
# )
#
# # 示例：创建验证集实例
# test_dataset = ImageTextDataset(
#     image_paths_file=os.path.join(config.output_folder, 'image_paths_test.json'),
#     captions_file=os.path.join(config.output_folder, 'encoded_captions_test.json'),
#     caplens_file=os.path.join(config.output_folder, 'caplens_test.json')
# )


3. `create_dataloaders`函数：使用`ImageTextDataset`类来创建PyTorch的`DataLoader`，它提供了一个可迭代的数据加载器，用于在训练和测试时批量加载数据，并可选地对数据进行打乱和多进程加载。

In [None]:
# 创建训练集和测试集的 DataLoader
def create_dataloaders(config):
    """
    创建训练集和测试集的 DataLoader。

    参数:
        batch_size: 每个批次的大小。
        num_workers: 加载数据时使用的进程数。
        shuffle_train: 是否打乱训练数据。

    返回:
        train_loader: 训练数据的 DataLoader。
        test_loader: 测试数据的 DataLoader。
    """
    # 图像预处理转换
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 加载数据时使用的进程数
    num_workers = 0

    # 创建数据集对象
    train_dataset = ImageTextDataset(
        image_paths_file=os.path.join(config.output_folder, 'image_paths_train.json'),
        captions_file=os.path.join(config.output_folder, 'encoded_captions_train.json'),
        caplens_file=os.path.join(config.output_folder, 'caplens_train.json'),
        transform=transform
    )

    test_dataset = ImageTextDataset(
        image_paths_file=os.path.join(config.output_folder, 'image_paths_test.json'),
        captions_file=os.path.join(config.output_folder, 'encoded_captions_test.json'),
        caplens_file=os.path.join(config.output_folder, 'caplens_test.json'),
        transform=transform
    )

    # 创建 DataLoader 对象
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=config.batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=config.batch_size,
        shuffle=False,  # 通常测试集不需要打乱
        num_workers=num_workers,
        pin_memory=True
    )

    return train_loader, test_loader

此外，在文件的末尾，通过`if __name__ == '__main__':`块，文件提供了一个简单的测试用例，用于验证训练数据加载器是否正确创建，并能够生成批量数据。

In [None]:
config = Config()
# 使用Config类中定义的配置来创建DataLoader
train_loader, test_loader = create_dataloaders(config=config)

# 测试 DataLoader 是否正确创建
if __name__ == '__main__':
    for i, (images, captions, caplens) in enumerate(train_loader):
        print(f"Batch {i + 1}")
        print(f"Images shape: {images.size()}")
        print(f"Captions shape: {captions.size()}")
        if i == 1:  # 仅打印前两个批次的信息
            break

我还定义了一个 `datasets_pretrain_demo.py` 文件来验证数据预处理过程是否正确。它通过以下步骤实现这一目标：

1. **读取词汇表和编码后的描述**：
   - 加载之前生成的词汇表 (`vocab.json`)，编码后的训练集描述 (`encoded_captions_train.json`)，以及训练图像的路径 (`image_paths_train.json`)。

2. **索引到单词的转换**：
   - 创建从词索引到单词的反向映射，用于将编码后的描述转换回文本形式。

3. **选择并展示图像**：
   - 从图像路径列表中选择第一个图像ID，并加载对应的图像。

4. **展示图像**：
   - 使用matplotlib展示图像，并关闭坐标轴。

5. **打印文本描述**：
   - 将编码后的描述（词索引列表）转换回单词形式，并打印出来，以验证编码和图像加载的正确性。

In [None]:
import json
from PIL import Image
import matplotlib.pyplot as plt

vocab_path = '../data/output/vocab.json'
encoded_captions_path = '../data/output/encoded_captions_train.json'
image_paths_path = '../data/output/image_paths_train.json'

# 读取词典、编码后的描述和图像路径
with open(vocab_path, 'r') as f:
    vocab = json.load(f)

with open(encoded_captions_path, 'r') as f:
    encoded_captions = json.load(f)

with open(image_paths_path, 'r') as f:
    image_paths = json.load(f)

# 将索引转换回单词
vocab_idx2word = {idx: word for word, idx in vocab.items()}

# 选择要展示的图片ID，这里以第一个ID为例
first_img_id = list(image_paths.keys())[0]
content_img = Image.open(image_paths[first_img_id])

# 展示图片和对应的描述
plt.imshow(content_img)
plt.axis('off')  # 不显示坐标轴
plt.show()

# 打印对应的文本描述，确保字典中的键是整数，直接使用整数索引
caption = ' '.join([vocab_idx2word[word_idx] for word_idx in encoded_captions[first_img_id]])
# caption = ' '.join([vocab_idx2word[str(word_idx)] for word_idx in encoded_captions[first_img_id]])
print(caption)

## 模型定义
`models.py`文件定义了用于服饰图像描述任务的神经网络模型，包括图像编码器、注意力机制、文本解码器、整体模型框架和损失函数。以下是代码中各个部分的详细作用：

1. **自注意力机制 (`SelfAttention` 类)**：
   - 定义了一个利用`nn.MultiheadAttention`实现的自注意力层。它可以处理图像的特征，使模型能够在图像的不同区域之间建立联系，这在解析复杂图像时非常有用。

In [None]:
import torch
import torch.nn as nn
from pycocoevalcap.cider.cider import Cider
import numpy as np
from configurations import Config
from torchvision.models import resnet101, ResNet101_Weights
from torch.nn.utils.rnn import pack_padded_sequence
import torch.optim as optim

# 引入自注意机制后的图像编码器
class SelfAttention(nn.Module):
    def __init__(self, num_channels, num_heads=8, dropout=0.1):
        super(SelfAttention, self).__init__()
        self.num_heads = num_heads
        self.attention = nn.MultiheadAttention(num_channels, num_heads, dropout)

    def forward(self, x):
        # 保存原始形状
        orig_shape = x.shape
        # 打印输入形状
        print("Input shape:", x.shape)
        # 转换为(sequence_length, batch_size, num_channels)格式
        x = x.flatten(2).permute(2, 0, 1)
        attention_output, _ = self.attention(x, x, x)
        # 还原形状，确保与原始输入形状匹配
        attention_output = attention_output.permute(1, 2, 0)# 打印最终输出形状
        print("Final output shape:", attention_output.shape)
        return attention_output.view(orig_shape)

2. **图像编码器 (`ImageEncoder` 类)**：
   - 使用预训练的ResNet-101模型作为特征提取器，抽取图像的高层特征。这些特征接着被自注意力层进一步处理，以增强图像区域间的相关性。

In [None]:
class ImageEncoder(nn.Module):
    def __init__(self, finetuned=True, num_heads=8, dropout=0.1):
        super(ImageEncoder, self).__init__()
        # 使用ResNet101作为基础模型
        model = resnet101(weights=ResNet101_Weights.DEFAULT)
        self.grid_rep_extractor = nn.Sequential(*(list(model.children())[:-2]))
        # 设置参数是否可训练
        for param in self.grid_rep_extractor.parameters():
            param.requires_grad = finetuned

        # 自注意力层
        self.self_attention = SelfAttention(model.fc.in_features, num_heads, dropout)

    def forward(self, images):
        # 通过ResNet网格表示提取器
        features = self.grid_rep_extractor(images)
        print("Extractor output shape:", features.shape)
        # 应用自注意力
        features = self.self_attention(features)
        # 打印自注意力输出形状
        print("Self-attention output shape:", features.shape)
        return features

3. **解码器的注意力机制 (`AdditiveAttention` 类)**：
   - 实现了一种加法（或称为Bahdanau）注意力机制，用于计算解码过程中的上下文向量。它通过比较解码器的隐藏状态（query）与图像编码（key-value）之间的关系来计算每个位置的注意力权重。

In [None]:
# 解码器的注意力机制
class AdditiveAttention(nn.Module):
    def  __init__(self, query_dim, key_dim, attn_dim):
        """
        参数：
            query_dim: 查询Q的维度
            key_dim: 键K的维度
            attn_dim: 注意力函数隐藏层表示的维度
        """
        super(AdditiveAttention, self).__init__()
        self.attn_w_1_q = nn.Linear(query_dim, attn_dim)
        self.attn_w_1_k = nn.Linear(key_dim, attn_dim)
        self.attn_w_2 = nn.Linear(attn_dim, 1)
        self.tanh = nn.Tanh()
        self.softmax = nn.Softmax(dim=1)

    def forward(self, query, key_value):
        """
        Q K V：Q和K算出相关性得分，作为V的权重，K=V
        参数：
            query: 查询 (batch_size, q_dim)
            key_value: 键和值，(batch_size, n_kv, kv_dim)
        """
        # （2）计算query和key的相关性，实现注意力评分函数
        # -> (batch_size, 1, attn_dim)
        queries = self.attn_w_1_q(query).unsqueeze(1)
        # -> (batch_size, n_kv, attn_dim)
        keys = self.attn_w_1_k(key_value)
        # -> (batch_size, n_kv)
        attn = self.attn_w_2(self.tanh(queries+keys)).squeeze(2)
        # （3）归一化相关性分数
        # -> (batch_size, n_kv)
        attn = self.softmax(attn)
        # （4）计算输出
        # (batch_size x 1 x n_kv)(batch_size x n_kv x kv_dim)
        # -> (batch_size, 1, kv_dim)
        output = torch.bmm(attn.unsqueeze(1), key_value).squeeze(1)
        return output, attn

4. **文本解码器 (`AttentionDecoder` 类)**：
   - 定义了一个注意力机制的解码器，它结合了图像编码和前一个时间步的词嵌入来生成文本描述。解码器使用GRU单元进行序列生成，并且在每个时间步使用注意力权重来关注图像的不同区域。

In [None]:
# 文本解码器
class AttentionDecoder(nn.Module):
    """
           初始化文本解码器。

           参数:
               image_code_dim: 图像编码的维度。
               vocab_size: 词汇表的大小。
               word_dim: 词嵌入的维度。
               attention_dim: 注意力机制的隐藏层维度。
               hidden_size: GRU隐藏层的大小。
               num_layers: GRU层数。
               dropout: Dropout层的概率。
    """
    def __init__(self, image_code_dim, vocab_size, word_dim, attention_dim, hidden_size, num_layers, dropout=0.5):
        super(AttentionDecoder, self).__init__()
        self.embed = nn.Embedding(vocab_size, word_dim)
        self.attention = AdditiveAttention(hidden_size, image_code_dim, attention_dim)
        self.init_state = nn.Linear(image_code_dim, num_layers * hidden_size)
        self.rnn = nn.GRU(word_dim + image_code_dim, hidden_size, num_layers)
        self.dropout = nn.Dropout(p=dropout)
        self.fc = nn.Linear(hidden_size, vocab_size)
        # RNN默认已初始化
        self.init_weights()

    def init_weights(self):
        self.embed.weight.data.uniform_(-0.1, 0.1)
        self.fc.bias.data.fill_(0)
        self.fc.weight.data.uniform_(-0.1, 0.1)

    def init_hidden_state(self, image_code, captions, cap_lens):
        """
        初始化隐藏状态。

        参数：
            image_code：图像编码器输出的图像表示
                        (batch_size, image_code_dim, grid_height, grid_width)
            captions: 文本描述。
            cap_lens: 文本描述的长度。
        """
        # 将图像网格表示转换为序列表示形式
        batch_size, image_code_dim = image_code.size(0), image_code.size(1)
        # -> (batch_size, grid_height, grid_width, image_code_dim)
        image_code = image_code.permute(0, 2, 3, 1)
        # -> (batch_size, grid_height * grid_width, image_code_dim)
        image_code = image_code.view(batch_size, -1, image_code_dim)
        # （1）按照caption的长短排序
        sorted_cap_lens, sorted_cap_indices = torch.sort(cap_lens, 0, True)
        captions = captions[sorted_cap_indices]
        image_code = image_code[sorted_cap_indices]
        # （2）初始化隐状态
        hidden_state = self.init_state(image_code.mean(axis=1))
        hidden_state = hidden_state.view(
            batch_size,
            self.rnn.num_layers,
            self.rnn.hidden_size).permute(1, 0, 2)
        return image_code, captions, sorted_cap_lens, sorted_cap_indices, hidden_state

    def forward_step(self, image_code, curr_cap_embed, hidden_state):
        """
                解码器的前馈步骤。

                参数:
                    image_code: 图像编码。
                    curr_cap_embed: 当前时间步的词嵌入向量。
                    hidden_state: 当前的隐藏状态。
                """
        # （3.2）利用注意力机制获得上下文向量
        # query：hidden_state[-1]，即最后一个隐藏层输出 (batch_size, hidden_size)
        # context: (batch_size, hidden_size)
        context, alpha = self.attention(hidden_state[-1], image_code)
        # （3.3）以上下文向量和当前时刻词表示为输入，获得GRU输出
        x = torch.cat((context, curr_cap_embed), dim=-1).unsqueeze(0)
        # x: (1, real_batch_size, hidden_size+word_dim)
        # out: (1, real_batch_size, hidden_size)
        out, hidden_state = self.rnn(x, hidden_state)
        # （3.4）获取该时刻的预测结果
        # (real_batch_size, vocab_size)
        preds = self.fc(self.dropout(out.squeeze(0)))
        return preds, alpha, hidden_state

    def forward(self, image_code, captions, cap_lens):
        """
        完整的前馈过程。

        参数：
            hidden_state: (num_layers, batch_size, hidden_size)
            image_code:  (batch_size, feature_channel, feature_size)
            captions: (batch_size, )
        """
        # （1）将图文数据按照文本的实际长度从长到短排序
        # （2）获得GRU的初始隐状态
        image_code, captions, sorted_cap_lens, sorted_cap_indices, hidden_state \
            = self.init_hidden_state(image_code, captions, cap_lens)
        batch_size = image_code.size(0)
        # 输入序列长度减1，因为最后一个时刻不需要预测下一个词
        lengths = sorted_cap_lens.cpu().numpy() - 1
        # 初始化变量：模型的预测结果和注意力分数
        predictions = torch.zeros(batch_size, lengths[0], self.fc.out_features).to(captions.device)
        alphas = torch.zeros(batch_size, lengths[0], image_code.shape[1]).to(captions.device)
        # 获取文本嵌入表示 cap_embeds: (batch_size, num_steps, word_dim)
        cap_embeds = self.embed(captions)
        # Teacher-Forcing模式
        for step in range(lengths[0]):
            # （3）解码
            # （3.1）模拟pack_padded_sequence函数的原理，获取该时刻的非<pad>输入
            real_batch_size = np.where(lengths > step)[0].shape[0]
            preds, alpha, hidden_state = self.forward_step(
                image_code[:real_batch_size],
                cap_embeds[:real_batch_size, step, :],
                hidden_state[:, :real_batch_size, :].contiguous())
            # 记录结果
            predictions[:real_batch_size, step, :] = preds
            alphas[:real_batch_size, step, :] = alpha
        return predictions, alphas, captions, lengths, sorted_cap_indices


5. **整体模型 (`ARCTIC` 类)**：
   - 将图像编码器和文本解码器整合在一起，定义了完整的模型流程。在前向传递过程中，模型接受图像和文本描述，利用编码器和解码器生成描述的输出。

In [None]:
# ARCTIC 模型
class ARCTIC(nn.Module):
    def __init__(self, image_code_dim, vocab, word_dim, attention_dim, hidden_size, num_layers):
        super(ARCTIC, self).__init__()
        self.vocab = vocab
        self.encoder = ImageEncoder()
        self.decoder = AttentionDecoder(image_code_dim, len(vocab), word_dim, attention_dim, hidden_size, num_layers)

    def forward(self, images, captions, cap_lens):
        # 打印图像输入形状
        print("Image input shape:", images.shape)
        image_code = self.encoder(images)
        # 打印编码器输出形状
        print("Encoder output shape:", image_code.shape)
        output = self.decoder(image_code, captions, cap_lens)
        # 打印解码器输出形状
        print("Decoder output shape:", output[0].shape)  # Assuming output[0] is the main output
        return output

    def generate_by_beamsearch(self, images, beam_k, max_len):
        vocab_size = len(self.vocab)
        image_codes = self.encoder(images)
        texts = []
        device = images.device
        # 对每个图像样本执行束搜索
        for image_code in image_codes:
            # 将图像表示复制k份
            image_code = image_code.unsqueeze(0).repeat(beam_k, 1, 1, 1)
            # 生成k个候选句子，初始时，仅包含开始符号<start>
            cur_sents = torch.full((beam_k, 1), self.vocab['<start>'], dtype=torch.long).to(device)
            cur_sent_embed = self.decoder.embed(cur_sents)[:, 0, :]
            sent_lens = torch.LongTensor([1] * beam_k).to(device)
            # 获得GRU的初始隐状态
            image_code, cur_sent_embed, _, _, hidden_state = \
                self.decoder.init_hidden_state(image_code, cur_sent_embed, sent_lens)
            # 存储已生成完整的句子（以句子结束符<end>结尾的句子）
            end_sents = []
            # 存储已生成完整的句子的概率
            end_probs = []
            # 存储未完整生成的句子的概率
            probs = torch.zeros(beam_k, 1).to(device)
            k = beam_k
            while True:
                preds, _, hidden_state = self.decoder.forward_step(image_code[:k], cur_sent_embed,
                                                                   hidden_state.contiguous())
                # -> (k, vocab_size)
                preds = nn.functional.log_softmax(preds, dim=1)
                # 对每个候选句子采样概率值最大的前k个单词生成k个新的候选句子，并计算概率
                # -> (k, vocab_size)
                probs = probs.repeat(1, preds.size(1)) + preds
                if cur_sents.size(1) == 1:
                    # 第一步时，所有句子都只包含开始标识符，因此，仅利用其中一个句子计算topk
                    values, indices = probs[0].topk(k, 0, True, True)
                else:
                    # probs: (k, vocab_size) 是二维张量
                    # topk函数直接应用于二维张量会按照指定维度取最大值，这里需要在全局取最大值
                    # 因此，将probs转换为一维张量，再使用topk函数获取最大的k个值
                    values, indices = probs.view(-1).topk(k, 0, True, True)
                # 计算最大的k个值对应的句子索引和词索引
                sent_indices = torch.div(indices, vocab_size, rounding_mode='trunc')
                word_indices = indices % vocab_size
                # 将词拼接在前一轮的句子后，获得此轮的句子
                cur_sents = torch.cat([cur_sents[sent_indices], word_indices.unsqueeze(1)], dim=1)
                # 查找此轮生成句子结束符<end>的句子
                end_indices = [idx for idx, word in enumerate(word_indices) if word == self.vocab['<end>']]
                if len(end_indices) > 0:
                    end_probs.extend(values[end_indices])
                    end_sents.extend(cur_sents[end_indices].tolist())
                    # 如果所有的句子都包含结束符，则停止生成
                    k -= len(end_indices)
                    if k == 0:
                        break
                # 查找还需要继续生成词的句子
                cur_indices = [idx for idx, word in enumerate(word_indices)
                               if word != self.vocab['<end>']]
                if len(cur_indices) > 0:
                    cur_sent_indices = sent_indices[cur_indices]
                    cur_word_indices = word_indices[cur_indices]
                    # 仅保留还需要继续生成的句子、句子概率、隐状态、词嵌入
                    cur_sents = cur_sents[cur_indices]
                    probs = values[cur_indices].view(-1, 1)
                    hidden_state = hidden_state[:, cur_sent_indices, :]
                    cur_sent_embed = self.decoder.embed(
                        cur_word_indices.view(-1, 1))[:, 0, :]
                # 句子太长，停止生成
                if cur_sents.size(1) >= max_len:
                    break
            if len(end_sents) == 0:
                # 如果没有包含结束符的句子，则选取第一个句子作为生成句子
                gen_sent = cur_sents[0].tolist()
            else:
                # 否则选取包含结束符的句子中概率最大的句子
                gen_sent = end_sents[end_probs.index(max(end_probs))]
            texts.append(gen_sent)
        return texts

6. **损失函数 (`PackedCrossEntropyLoss` 类)**：
   - 为序列学习任务定义了交叉熵损失函数，忽略填充的部分。它使用了`pack_padded_sequence`来处理不同长度的序列。

In [None]:
# 损失函数
class PackedCrossEntropyLoss(nn.Module):
    def __init__(self):
        super(PackedCrossEntropyLoss, self).__init__()
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, predictions, targets, lengths):
        """
        计算交叉熵损失，排除填充的部分。
        参数：
            predictions：模型的预测结果，形状为 (batch_size, max_length, vocab_size)。
            targets：实际的文本描述，形状为 (batch_size, max_length)。
            lengths：每个描述的实际长度。
        """
        # 使用 pack_padded_sequence 来处理变长序列
        # 这里 predictions 和 targets 都需要进行 pack 操作
        # 由于 pack_padded_sequence 需要长度从长到短的序列，这里假设输入已经是这种格式
        packed_predictions = pack_padded_sequence(predictions, lengths, batch_first=True, enforce_sorted=False)[0]
        packed_targets = pack_padded_sequence(targets, lengths, batch_first=True, enforce_sorted=False)[0]

        # 计算损失，忽略填充的部分
        loss = self.loss_fn(packed_predictions, packed_targets)
        return loss

7. **优化器 (`get_optimizer` 函数)**：
   - 定义了一个函数来为模型的不同部分设置不同的学习率，并创建优化器。

In [None]:
def get_optimizer(model, config):
    """
    获取优化器，为模型的不同部分设置不同的学习速率。
    参数：
        model：训练模型。
        config：包含配置信息的对象，如学习速率等。
    返回：
        配置好地优化器。
    """
    # 为编码器和解码器设置不同的学习速率
    encoder_params = filter(lambda p: p.requires_grad, model.encoder.parameters())
    decoder_params = filter(lambda p: p.requires_grad, model.decoder.parameters())

    # 创建优化器，分别对这两部分参数应用不同的学习速率
    optimizer = optim.Adam([
        {"params": encoder_params, "lr": config.encoder_learning_rate},
        {"params": decoder_params, "lr": config.decoder_learning_rate}
    ])

    return optimizer

8. **学习率调整 (`adjust_learning_rate` 函数)**：
   - 定义了一个函数来调整优化器中的学习率，根据预设的策略在训练过程中逐渐减小学习率。

In [None]:
def adjust_learning_rate(optimizer, epoch, config):
    """
    调整学习速率，每隔一定轮次减少到原来的十分之一。
    参数：
        optimizer：优化器。
        epoch：当前轮次。
        config：包含配置信息的对象。
    """
    for param_group in optimizer.param_groups:
        if param_group['name'] == 'encoder':
            param_group['lr'] = config.encoder_learning_rate * (0.1 ** (epoch // config.lr_update))
        else:
            param_group['lr'] = config.decoder_learning_rate * (0.1 ** (epoch // config.lr_update))

9. **评估函数 (`evaluate_cider` 函数)**：
   - 定义了一个函数来评估生成的描述的质量，使用CIDEr-D评分系统对模型性能进行评估。这个评分系统特别关注描述的差异性。

In [None]:
# CIDEr-D 评估
def filter_useless_words(sent, filterd_words):
    # 去除句子中不参与CIDEr-D计算的符号
    return [w for w in sent if w not in filterd_words]


def evaluate_cider(data_loader, model, config):
    model.eval()
    # 存储候选文本和参考文本
    cands = {}
    refs = {}
    filterd_words = {model.vocab['<start>'], model.vocab['<end>'], model.vocab['<pad>']}
    device = next(model.parameters()).device

    for i, (imgs, caps, caplens, allcaps) in enumerate(data_loader):
        imgs = imgs.to(device)
        # Generate captions
        preds = model.sample(imgs)
        for j in range(imgs.size(0)):
            img_id = str(i * config.batch_size + j)
            cand = ' '.join(filter_useless_words(preds[j], filterd_words))
            cands[img_id] = [cand]
            refs[img_id] = list(map(lambda x: ' '.join(filter_useless_words(x, filterd_words)), allcaps[j].tolist()))

    # 计算CIDEr-D得分
    cider_evaluator = Cider()
    score, scores = cider_evaluator.compute_score(refs, cands)

    model.train()
    return score

## 模型训练
`train.py` 文件中的 `main` 函数实现了模型训练的完整流程，包括数据准备、模型初始化、训练循环、损失计算、优化步骤以及模型评估。下面是详细的步骤分析：

1. **配置加载**：
   - 加载配置参数，这些参数在 `configurations.py` 文件中被定义。

2. **数据加载器创建**：
   - 使用 `create_dataloaders` 函数创建用于训练和测试的数据加载器。

3. **词汇表加载**：
   - 加载词汇表文件，这对于后续将文本编码和解码成数字是必要的。

4. **模型初始化**：
   - 实例化 `ARCTIC` 模型，传入必要的参数，如图像编码维度、词汇表、词嵌入维度等，并将模型转移到配置指定的设备上（如 GPU）。

5. **优化器设置**：
   - 调用 `get_optimizer` 函数为模型设置优化器，以用于训练中的参数更新。

6. **损失函数定义**：
   - 实例化 `PackedCrossEntropyLoss` 类，用于计算模型输出和目标序列之间的损失。

7. **权重保存路径创建**：
   - 创建用于保存训练过程中模型权重的目录。

8. **训练循环**：
   - 对于设定的训练轮次，执行以下操作：
     - 将模型置于训练模式。
     - 遍历训练数据加载器中的数据批次，对于每个批次：
       - 将图像和文本数据移至配置指定的设备。
       - 清空优化器状态。
       - 通过模型传递图像和文本，获取输出和注意力权重。
       - 计算损失，考虑到序列的实际长度。
       - 执行反向传播和优化器步骤以更新权重。
       - 定期打印损失信息。

9. **模型评估**：
   - 在每个训练轮次后，使用测试数据集评估模型性能，并打印 CIDEr 评分。

10. **模型保存**：
    - 如果当前模型性能好于之前的最佳性能，则保存模型权重（注释中提到的代码被注释掉了，但这是典型的做法）。
    - 在训练完成后，保存最终的模型权重。

In [None]:
import json
import torch
import os
from configurations import Config
from models import ARCTIC, get_optimizer, PackedCrossEntropyLoss, evaluate_cider
from datasets import create_dataloaders, ImageTextDataset


def main():
    # 加载配置
    config = Config()

    # 创建数据加载器
    train_loader, test_loader = create_dataloaders(config)

    # 加载词汇表文件
    with open('../data/output/vocab.json', 'r') as f:
        vocab = json.load(f)

    # 模型初始化
    model = ARCTIC(
        image_code_dim=config.image_code_dim,
        vocab=vocab,  # 传递词汇表字典
        word_dim=config.word_dim,
        attention_dim=config.attention_dim,
        hidden_size=config.hidden_size,
        num_layers=config.num_layers
    ).to(config.device)

    # 优化器
    optimizer = get_optimizer(model, config)

    # 损失函数
    loss_fn = PackedCrossEntropyLoss().to(config.device)

    # 创建保存权重的文件夹路径
    weights_dir = os.path.join(config.output_folder, 'weights')
    os.makedirs(weights_dir, exist_ok=True)

    best_val_score = float('-inf')  # 初始化最佳验证得分

    # 开始训练
    for epoch in range(config.num_epochs):
        # 训练模型
        model.train()
        for i, (imgs, caps, caplens) in enumerate(train_loader):
            imgs, caps = imgs.to(config.device), caps.to(config.device)
            caplens = caplens.cpu().to(torch.int64)

            optimizer.zero_grad()
            outputs, alphas, _, _, _ = model(imgs, caps, caplens)

            # 确保目标序列长度与模型输出匹配
            targets = caps[:, 1:]  # 假设targets是captions去除第一个<start>标记后的部分
            print(f"Caplens: {caplens}")
            loss = loss_fn(outputs, targets, caplens)
            loss.backward()
            optimizer.step()

            # 打印/记录损失信息
            if (i + 1) % 100 == 0:
                print(
                    f'Epoch [{epoch + 1}/{config.num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')

        # 在每个epoch结束时使用测试集评估模型
        current_test_score = evaluate_cider(test_loader, model, config)
        print(f"Epoch {epoch}: Test score = {current_test_score}")

        # 如果当前得分比之前的最佳得分要好，则保存模型
        # if current_val_score > best_val_score:
        #     best_val_score = current_val_score
        #     best_model_path = os.path.join(weights_dir, f'best_model_epoch_{epoch}.pth')
        #     torch.save(model.state_dict(), best_model_path)
        #     print(f"Saved new best model to {best_model_path}")

    # 训练完成后的最终评估
    final_test_score = evaluate_cider(test_loader, model, config)
    print(f"Final test score = {final_test_score}")

    # 训练完成后保存模型
    final_model_path = os.path.join(weights_dir, 'final_model.pth')
    torch.save(model.state_dict(), final_model_path)
    print(f"Saved final model to {final_model_path}")


if __name__ == '__main__':
    main()