## 三层神经网络选择器
来源于论文PET-select中的选择器部分 https://arxiv.org/abs/2409.16416

主要是通过微调后的codeBERT生成的问题向量进行输入，通过三层全连接网络，最后通过一个softmax函数输出各个类别的概率，进而来判断应该使用哪种PET

In [1]:
import random
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils import compute_class_weight
import tqdm
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

### 设定随机种子

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


set_seed(2)

### 获取嵌入向量的输入

In [None]:
from sentence_transformers import SentenceTransformer


def get_embedding(questions, model_path):
    """
    获取问题的嵌入表示。

    Args:
        questions (list of str): 需要获取嵌入表示的问题列表。
        model_path (str): SentenceTransformer模型的路径。

    Returns:
        list of numpy.ndarray: 每个问题的嵌入表示列表，每个嵌入表示是一个numpy数组。

    """
    model = SentenceTransformer(model_path)
    embeddings = []
    print("Generating embeddings...")
    for question in tqdm.tqdm(questions):
        embedding = model.encode(question)
        embeddings.append(embedding)
    return embeddings

### 检查GPU/CPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

### 读取数据和嵌入向量生成模型

In [None]:
dataset = "code_complex"
hard = "_hard"
data_type = "_all"
file_path = f"result/{dataset}_dataset/{dataset}_classification{hard}_dataset_train{data_type}.jsonl"
test_file_path = f"result/{dataset}_dataset/{dataset}_classification{hard}_dataset_test{data_type}.jsonl"
model_path = f"result/{dataset}_contrastive{hard}{data_type}_model"
data = pd.read_json(file_path, lines=True)
test_data = pd.read_json(test_file_path, lines=True)

### 处理数据

In [None]:
questions = data["text"].tolist()
labels = data["label"].tolist()
ranks = data["rank"].tolist()

test_questions = test_data["text"].tolist()
test_labels = test_data["label"].tolist()
test_ranks = test_data["rank"].tolist()

embeddings = get_embedding(questions, model_path)
test_embeddings = get_embedding(test_questions, model_path)

### 超参数设定

In [None]:
input_size = len(embeddings[0])
print(input_size)
num_classes = 9
print(num_classes)
num_epochs = 10
batch_size = 32
learning_rate = 0.001

### 划分训练集、验证集和测试集

In [None]:
(
    train_embeddings,
    eval_embeddings,
    train_labels,
    eval_labels,
    train_ranks,
    eval_ranks,
) = train_test_split(embeddings, labels, ranks, test_size=0.2, random_state=42)
test_embeddings, test_labels, test_ranks = test_embeddings, test_labels, test_ranks

### 获取自定义数据集格式

In [None]:
class CustomDataset(Dataset):
    def __init__(self, embeddings, labels, ranks):
        self.embeddings = torch.tensor(embeddings, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.ranks = ranks

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return self.embeddings[idx], self.labels[idx], self.ranks[idx]


def custom_collate_fn(batch):
    """
    DataLoader默认的数据合并方式可能不适用于所有情况，特别是当数据项具有复杂结构或需要特定处理时

    对batch数据进行自定义处理，返回处理后的数据

    Args:
        batch (list): 包含多个tuple的list，每个tuple包含三个元素，分别是embeddings, labels, relevance_scores

    Returns:
        tuple: 包含处理后的embeddings, labels, relevance_scores。
            embeddings (torch.Tensor): 将batch中所有样本的embeddings进行stack处理后的tensor
            labels (torch.Tensor): 将batch中所有样本的labels转换为long类型的tensor
            relevance_scores (list): batch中所有样本的relevance_scores的list
    """
    embeddings = torch.stack([item[0] for item in batch])
    labels = torch.tensor([item[1] for item in batch], dtype=torch.long)
    relevance_scores = [item[2] for item in batch]
    return embeddings, labels, relevance_scores


# 获得自定义数据集
train_dataset = CustomDataset(train_embeddings, train_labels, train_ranks)
eval_dataset = CustomDataset(eval_embeddings, eval_labels, eval_ranks)
test_dataset = CustomDataset(test_embeddings, test_labels, test_ranks)

# 获得自定义数据加载器
train_dataloader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate_fn
)
eval_dataloader = DataLoader(
    eval_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate_fn
)
test_dataloader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate_fn
)

# 计算类别权重，处理类别不平衡问题，防止模型过拟合
class_weights = compute_class_weight(
    "balanced", classes=np.unique(train_labels), y=train_labels
)
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

### 三层全连接神经网络

In [None]:
class ClassificationModel(nn.Module):
    def __init__(self, input_size, num_classes):
        # 调用父类的初始化函数
        super(ClassificationModel, self).__init__()
        # 定义第一个全连接层，输入大小为input_size，输出大小为128
        self.fc1 = nn.Linear(input_size, 128)
        # 定义第二个全连接层，输入大小为128，输出大小为64
        self.fc2 = nn.Linear(128, 64)
        # 定义第三个全连接层，输入大小为64，输出大小为num_classes（即分类的类别数）
        self.fc3 = nn.Linear(64, num_classes)
        # 定义ReLU激活函数
        self.relu = nn.ReLU()

    # 前向传播函数，定义了数据在模型中如何流动
    def forward(self, x):
        # 数据通过第一个全连接层后进行ReLU激活
        x = self.relu(self.fc1(x))
        # 数据通过第二个全连接层后进行ReLU激活
        x = self.relu(self.fc2(x))
        # 数据通过第三个全连接层，不进行激活
        x = self.fc3(x)
        # 对输出进行softmax操作，使得输出的每一行代表一个概率分布
        x = F.softmax(x, dim=1)
        return x


# create model, criterion, and optimizer 模型、损失函数和优化器（梯度下降的优化）
model = ClassificationModel(input_size, num_classes).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
# model.parameters(): 这个方法返回模型中所有可训练参数的迭代器。这些参数是优化器在每次迭代中需要更新的
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

### 计算 nDCG 和 MRR

#### 1. `dcg`（Discounted Cumulative Gain）
##### **功能**：

- **DCG** 是一种评估排序结果质量的指标。它通过考虑相关性得分以及排名位置的影响，衡量结果集的累积增益。
  
##### **工作原理**：
1. **输入**：
   - `relevances`：一个列表，表示不同项目的相关性得分。
   - `k`：我们只评估前`k`个项目。

2. **处理**：
   - 将相关性得分转换为浮点型数组，并截取前`k`个元素。
   - 使用折扣公式： 
     $$
     DCG = \sum_{i=1}^{k} \frac{\text{relevance}_i}{\log_2(i + 1)}
     $$
   - 每个相关性得分除以其位置的对数，位置越靠后，贡献越小（对数是从2开始以避免除以0）。
   
3. **返回**：  
   - 如果相关性数组不为空，返回折扣后的累加值；否则返回`0.0`。

---

#### 2. `ndcg`（Normalized Discounted Cumulative Gain）
##### **功能**：
- **nDCG** 是DCG的归一化版本，比较当前排序结果与理想排序结果的差距。

##### **工作原理**：
1. **输入**：
   - `relevances`：原始相关性得分列表。
   - `k`：评估前`k`个项目。

2. **步骤**：
   - 计算当前排序的DCG（调用`dcg`函数）。
   - 计算理想排序（相关性从大到小排列）的DCG，即`IDCG`。
   - 归一化：如果`IDCG`为0，则返回0，否则返回`DCG / IDCG`。

3. **意义**：  
   - 归一化后的nDCG范围在0到1之间，值越接近1，表示当前排序与理想排序越接近。

---

#### 3. `calculate_mrr`（Mean Reciprocal Rank）

##### **功能**：
- **MRR**（平均倒数排名）是用来评估排序模型性能的指标之一，衡量预测中正确答案在所有可能答案中的排名，并取倒数排名的平均值。

##### **工作原理**：
1. **输入**：
   - `outputs`：这是模型的预测得分，形状为 `[batch_size, num_classes]`，表示模型对每个样本的多个类别得分。
   - `labels`：真实标签，形状为 `[batch_size]`，表示每个样本的正确类别。

2. **处理**：
   - **遍历每个样本**：
     - 对每个样本，获取其所有类别的预测分数 `scores`。
     - 提取当前样本的真实类别 `target`。
     - 使用 `torch.sort` 将分数从高到低排序，得到排序后的索引 `sorted_indices`。
     - 通过检查 `sorted_indices` 找到真实类别在排序中的排名 `rank`，并计算其倒数排名 `1/rank`。
   - **累加倒数排名**：
     - 将每个样本的倒数排名累加至 `mrr`。
  
3. **返回**：
   - 取累积的倒数排名的平均值，作为 MRR 的输出。即：将所有样本的倒数排名加总后除以样本数。

---

#### 4. `calculate_ndcg`（Normalized Discounted Cumulative Gain）

##### **功能**：
- **NDCG**（归一化折损累积增益）是一个用于评估模型排序性能的指标，比较了模型排序与理想排序的差异，并归一化到 [0, 1] 区间，1 表示排序完全正确。
  
##### **工作原理**：
1. **输入**：
   - `outputs`：模型输出的预测得分，形状为 `[batch_size, num_items]`，表示每个样本中不同物品的评分。
   - `ranks`：真实标签，字典形式，每个 `item` 的ID对应一个标签（通常为 0 或 1，表示是否相关）。
   - `k`：一个可选参数，表示只考虑前 `k` 个排序最高的项目，默认值为 6。

2. **处理**：
   - **遍历每个样本**：
     - 对于每个样本，获取其真实的标签 `rank` 字典，表示各个 `item` 是否相关。
     - 获取当前样本的模型输出得分 `scores`。
     - 使用 `torch.sort` 对得分进行降序排序，得到排序后的分数和对应的 `item` 索引 `sorted_indices`。
     - 根据排序后的索引，使用真实标签 `rank` 字典生成一个相关性列表 `relevances`，其中每个元素表示排序后的 `item` 在真实标签中对应的相关性。
   - **计算 DCG**：
     - 调用 `ndcg` 函数，基于 `relevances` 和前 `k` 个项目，计算该样本的 NDCG 值。
     - 将每个样本的 NDCG 值累加到总的 NDCG 值 `ndcg_value` 中。
  
3. **返回**：
   - 计算并返回所有样本的平均 NDCG 值，即将累积的 NDCG 值除以批次大小 `outputs.size(0)`。

##### **意义**：
- NDCG 的值越高，说明模型的排序越接近真实相关性排序，反映了模型在信息检索或排序任务中的性能。
- 通过只考虑前 `k` 个项目，函数可以专注于模型最重要的排名结果。

In [1]:
def dcg(relevances, k):
    relevances = np.asarray(relevances, dtype=float)[:k]
    if relevances.size:
        return np.sum(relevances / np.log2(np.arange(2, relevances.size + 2)))
    return 0.0


def ndcg(relevances, k):
    dcg_value = dcg(relevances, k)
    idcg_value = dcg(sorted(relevances, reverse=True), k)
    if idcg_value == 0:
        return 0.0
    return dcg_value / idcg_value


# calcuate MRR
def calculate_mrr(outputs, labels):
    mrr = 0.0
    for i in range(outputs.size(0)):
        scores = outputs[i]
        target = labels[i].item()
        sorted_scores, sorted_indices = torch.sort(scores, descending=True)
        rank = (sorted_indices == target).nonzero(as_tuple=True)[0].item() + 1
        mrr += 1.0 / rank
    return mrr / outputs.size(0)


# calculate ndcg
def calculate_ndcg(outputs, ranks, k=6):
    ndcg_value = 0.0
    for i in range(outputs.size(0)):
        rank = ranks[i]
        scores = outputs[i]
        sorted_scores, sorted_indices = torch.sort(scores, descending=True)
        relevances = np.array(
            [rank[str(idx.item())] for idx in sorted_indices]
        )
        ndcg_value += ndcg(relevances, k) 
    return ndcg_value / outputs.size(0) 


#### 详细解释
对于每个提示词的标记是这样的
``` json
{
    "Zeroshot": 0,
    "Zeroshot_CoT": 1,
    "Fewshot": 2,
    "Fewshot_CoT": 3,
    "SelfDebugl": 4,
    "Reflection": 5,
    "SelfPlan": 6,
    "ProgressiveHint": 7,
    "Persona": 8
}
```
然后、对于数据集中的每个样本，是长这样的。`label` 代表最适合的PET，`rank` 代表每个PET的得分，越合适得分越高，如果为 0 代表测试不通过
``` json
{
    "question": "……",
    "label": 0,
    "rank": {
        "0": 9,
        "2": 8,
        "3": 7,
        "4": 6,
        "6": 5,
        "7": 4,
        "8": 3,
        "1": 0,
        "5": 0
    }
}
```

模拟模型的输出

假设模型的输出分数与真实排序极不一致，完全颠倒：
``` python
outputs = torch.tensor([[0.1, 0.9, 0.15, 0.2, 0.25, 0.95, 0.3, 0.35, 0.4]])
```
这表示：
- 模型认为 `item` 5 和 `item` 1 的分数最高，分别为 0.95 和 0.9，而这两个 `item` 的真实相关性为 0。
- 同时，模型认为真实相关性最高的 `item` 0 的得分最低，为 0.1。
####  calculate_mrr
**对得分排序**：
- `sorted_scores, sorted_indices = torch.sort(scores, descending=True)`：对模型预测的所有类别得分进行降序排序，并获取相应的类别索引（排序后的类别）。
- `sorted_scores = tensor([0.95, 0.9, 0.4, 0.35, 0.3, 0.25, 0.2, 0.15, 0.1])`
- `sorted_indices = tensor([5, 1, 8, 7, 6, 4, 3, 2, 0])`

**找到真实标签的排名**：
- `rank = (sorted_indices == target).nonzero(as_tuple=True)[0].item() + 1`：找到真实标签 `target` 在排序后的索引列表中的位置，然后加 1（因为排名是从 1 开始计算的）。
- `rank = 9`：真实标签 `target` 的排名为 9。

**累加倒数排名**：
- `mrr += 1.0 / rank`：将当前样本的倒数排名（即 $1 / \text{rank}$）累加到 `mrr` 变量中。
- `mrr = 1.0 / 9`：当前样本的 MRR 为 $1 / 9$。
**返回平均 MRR**：
- 在遍历完所有样本后，计算累加的 `mrr` 除以样本总数 `outputs.size(0)`，得到所有样本的 **平均倒数排名**  
####  calculate_ndcg
**对得分排序**：
- `sorted_scores, sorted_indices = torch.sort(scores, descending=True)`：对模型预测的所有类别得分进行降序排序，并获取相应的类别索引（排序后的类别）。
- `sorted_scores = tensor([0.95, 0.9, 0.4, 0.35, 0.3, 0.25, 0.2, 0.15, 0.1])`
- `sorted_indices = tensor([5, 1, 8, 7, 6, 4, 3, 2, 0])`

**相关性提取**：
- `relevances = np.array([rank[str(idx.item())] for idx in sorted_indices])`：根据排序后的索引获取对应的相关性标签
- `relevances = array([0 0 3 4 5 6 7 8 9])`

**计算DCG**：
- DCG公式：
    $$ \text{DCG} = \sum_{i=1}^{n} \frac{relevance_i}{\log_2(i+1)} $$
- 计算具体数值（假设考虑全部9个元素）：$$ \frac{0}{\log_2(2)} + \frac{0}{\log_2(3)} + \frac{3}{\log_2(4)} + \ldots $$

**计算IDCG**：
- 理想情况下相关性得分降序排列：`[9, 8, 7, 6, 5, 4, 3, 0, 0]`
- 计算IDCG值

**计算nDCG**：
- nDCG = DCG / IDCG

### 训练模型

In [None]:
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for embeddings_batch, labels_batch, ranks_batch in train_dataloader:
        # print(ranks_batch)
        embeddings_batch, labels_batch = embeddings_batch.to(device), labels_batch.to(
            device
        )

        outputs = model(embeddings_batch)
        loss = criterion(outputs, labels_batch)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total += labels_batch.size(0)
        correct += (predicted == labels_batch).sum().item()

    epoch_loss = running_loss / len(train_dataloader)
    epoch_acc = 100 * correct / total

    # evaluate model
    model.eval()
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0
    val_mrr = 0.0
    val_ndcg = 0.0
    val_ndcg_total = 0
    with torch.no_grad():
        for val_embeddings_batch, val_labels_batch, val_ranks_batch in eval_dataloader:
            val_embeddings_batch, val_labels_batch = val_embeddings_batch.to(
                device
            ), val_labels_batch.to(device)

            val_outputs = model(val_embeddings_batch)
            val_loss = criterion(val_outputs, val_labels_batch)
            val_running_loss += val_loss.item()

            _, val_predicted = torch.max(val_outputs.data, 1)
            val_total += val_labels_batch.size(0)
            val_ndcg_total += val_outputs.size(0)
            val_correct += (val_predicted == val_labels_batch).sum().item()

            val_mrr += calculate_mrr(
                val_outputs, val_labels_batch
            ) * val_labels_batch.size(0)
            val_ndcg += calculate_ndcg(
                val_outputs, val_ranks_batch, num_classes
            ) * val_outputs.size(0)

    val_epoch_loss = val_running_loss / len(eval_dataloader)
    val_epoch_acc = 100 * val_correct / val_total
    val_epoch_mrr = val_mrr / val_total
    val_epoch_ndcg = val_ndcg / val_ndcg_total

    print(
        f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%, Val Loss: {val_epoch_loss:.4f}, Val Accuracy: {val_epoch_acc:.2f}%, Val MRR: {val_epoch_mrr:.4f}, Val nDCG: {val_epoch_ndcg:.4f}"
    )

### 测试模型 

#### 测试函数

In [None]:
def evaluate_nDCG(model, dataloader, device, k=6):
    model.eval()
    ndcg_total = 0.0
    total = 0
    with torch.no_grad():
        for embeddings_batch, labels_batch, rank_batch in dataloader:
            embeddings_batch, labels_batch = embeddings_batch.to(
                device
            ), labels_batch.to(device)

            outputs = model(embeddings_batch)

            for i in range(outputs.size(0)):
                rank = rank_batch[i]
                scores = outputs[i]
                sorted_scores, sorted_indices = torch.sort(scores, descending=True)
                relevances = np.array([rank[str(idx.item())] for idx in sorted_indices])
                ndcg_total += ndcg(relevances, k)
                total += 1

    ndcg_avg = ndcg_total / total
    print(f"nDCG@{k}: {ndcg_avg:.4f}")


def evaluate_mrr(model, dataloader, device):
    model.eval()
    mrr = 0.0
    total = 0
    with torch.no_grad():
        for embeddings_batch, labels_batch, _ in dataloader:
            embeddings_batch, labels_batch = embeddings_batch.to(
                device
            ), labels_batch.to(device)

            outputs = model(embeddings_batch)
            for i in range(outputs.size(0)):
                scores = outputs[i]
                target = labels_batch[i].item()
                sorted_scores, sorted_indices = torch.sort(scores, descending=True)
                print(target, sorted_indices)
                rank = (sorted_indices == target).nonzero(as_tuple=True)[0].item() + 1
                mrr += 1.0 / rank
                total += 1

    mrr = mrr / total
    print(f"MRR: {mrr:.4f}")


def evaluate_model(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for embeddings_batch, labels_batch, _ in dataloader:
            embeddings_batch, labels_batch = embeddings_batch.to(
                device
            ), labels_batch.to(device)

            outputs = model(embeddings_batch)
            _, predicted = torch.max(outputs.data, 1)

            total += labels_batch.size(0)
            correct += (predicted == labels_batch).sum().item()

    accuracy = correct / total
    print(f"Accuracy: {accuracy * 100:.2f}%")

#### 测试并保存模型

In [None]:
evaluate_nDCG(model, test_dataloader, device, num_classes)
evaluate_mrr(model, test_dataloader, device)
evaluate_model(model, test_dataloader, device)

output_model_path = f"result/classification_model/{dataset}_classification{hard}{data_type}_model_parameters.pth"
torch.save(model.state_dict(), output_model_path)

#### 调用模型

In [None]:
model_path = f"result/{dataset}_contrastive{hard}{data_type}_model"
classification_model_path = f"result/classification_model/{dataset}_classification{hard}{data_type}_model_parameters.pth"
classification_model = ClassificationModel(input_size, num_classes)
classification_model.load_state_dict(torch.load(classification_model_path))

embeddings = get_embedding(questions, model_path)
model.eval()
model.to(device)
if isinstance(embeddings, list):
    embeddings = torch.tensor(embeddings).to(device)
with torch.no_grad():
    outputs = model(embeddings)