In [None]:
# ! conda install -n base ipykernel --update-deps --force-reinstall
# ! pip install matplotlib
# ! pip install transformers

In [None]:
from transformers import BertTokenizer, BertModel
from torch.utils.data import Dataset, DataLoader
from torch.optim import SGD
from torch import nn
import torch.nn.functional as F
import torch
import json
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import numpy as np

模型

In [None]:
class BiGRUSim(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(BiGRUSim, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bi_gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, output_size) 

    def forward(self, label):
        h0 = torch.zeros(self.num_layers * 2, label.size(0), self.hidden_size)
        out, _ = self.bi_gru(label, h0)
        out = self.fc(out[:, -1, :])
        return out

In [None]:
class BertSim(nn.Module):
    def __init__(self, backbone, tokenizer) -> None:
        super(BertSim, self).__init__()
        self.Backbone = backbone
        self.tokenizer = tokenizer
    
    def forward(self, input, labels):
        input_embed = self.Backbone(**input)
        
        mask_indices = torch.where(input['input_ids']==self.tokenizer.mask_token_id)
        input_embed = input_embed.last_hidden_state[mask_indices]

        input_embed = input_embed.unsqueeze(1).expand(-1, labels.size(1), -1)

        pred = F.cosine_similarity(labels, input_embed, dim=2)

        return pred

数据集

In [None]:
# 构建Dataset对象
class ChoiceDataset(Dataset):
    def __init__(self, dataset, form_name):
        with open(f"../../dataset/{form_name}/sim_embeded_labels.json", 'r', encoding='utf-8') as file:
            embeded_labels = json.load(file)

        self.input = []
        self.index = []
        self.labels = []

        labels_max_len = 0
        for data in dataset:
            labels_max_len = max(labels_max_len, len(data['labels']))

        for data in dataset:
            if len(data['input'])>510:
                data['input'] = data['input'][-510:]
            self.input.append(data['input'])
            self.index.append(data['index'])
            labels = torch.zeros((labels_max_len, 768))
            for i, label in enumerate(data['labels']):
                labels[i] = torch.tensor(embeded_labels[label])
            self.labels.append(labels)

    def __len__(self):
        assert len(self.index) == len(self.input)
        return len(self.index)

    def __getitem__(self, index):
        return self.input[index], self.index[index], self.labels[index]

训练过程

In [None]:
def train(model, tokenizer, form_name, train_data, eval_data, test_data, epochs, batch_size, optimizer, schedualer, model_dict_path=""):
    train_dataset, eval_dataset, test_dataset = ChoiceDataset(train_data, form_name), ChoiceDataset(eval_data, form_name), ChoiceDataset(test_data, form_name)
    train_dataloader, eval_dataloader, test_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True), DataLoader(eval_dataset, batch_size=batch_size, shuffle=True), DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    criterion = nn.CrossEntropyLoss()

    train_loss_list = []
    eval_loss_list = []
    train_acc_list = []
    eval_acc_list = []

    if (model_dict_path != ""):
        model.load_state_dict(torch.load(model_dict_path, map_location=device))
        
    model.to(device)
    criterion.to(device)

    for epoch in range(epochs):

        total_loss_train = 0
        total_acc_train = 0
        model.train()
        for train_input, train_index, train_labels in tqdm(train_dataloader):
            train_input = tokenizer(
                train_input,
                max_length=512,
                truncation=True,
                padding='max_length',
                return_tensors='pt').to(device)
            train_index = train_index.to(device)
            train_labels = train_labels.to(device)
            pred = model(train_input, train_labels)

            bacth_loss = criterion(pred, train_index)
            total_loss_train += bacth_loss

            acc = (pred.argmax(dim=1)==train_index).sum().item()
            total_acc_train += acc

            optimizer.zero_grad()
            bacth_loss.backward()
            optimizer.step()

        print(
            f'Epochs: {epoch + 1}: \
            | Train Loss: {total_loss_train / len(train_dataset): .3f} \
            | Train Accuracy: {total_acc_train / len(train_dataset): .3f} '
        )
        train_loss_list.append(total_loss_train)
        train_acc_list.append(total_acc_train)

        total_loss_eval = 0
        total_acc_eval = 0
        model.eval()
        for eval_input, eval_index, eval_labels in tqdm(eval_dataloader):
            eval_input = tokenizer(
                eval_input,
                max_length=512,
                truncation=True,
                padding='max_length',
                return_tensors='pt').to(device)
            eval_index = eval_index.to(device)
            eval_labels = eval_labels.to(device)
            with torch.no_grad():
                pred = model(eval_input, eval_labels)

            bacth_loss = criterion(pred, eval_index)
            total_loss_eval += bacth_loss

            acc = (pred.argmax(dim=1)==eval_index).sum().item()
            total_acc_eval += acc

        print(
            f'Epochs: {epoch + 1}: \
            | eval Loss: {total_loss_eval / len(eval_dataset): .3f} \
            | eval Accuracy: {total_acc_eval / len(eval_dataset): .3f} '
        )
        eval_loss_list.append(total_loss_eval)
        eval_acc_list.append(total_acc_eval)

        schedualer.step()

    total_loss_test = 0
    total_acc_test = 0
    model.eval()
    for test_input, test_index, test_labels in tqdm(test_dataloader):
        test_input = tokenizer(
                test_input,
                max_length=512,
                truncation=True,
                padding='max_length',
                return_tensors='pt').to(device)
        test_index = test_index.to(device)
        test_labels = test_labels.to(device)
        with torch.no_grad():
            pred = model(test_input, test_labels)

        bacth_loss = criterion(pred, test_index)
        total_loss_test += bacth_loss

        acc = (pred.argmax(dim=1)==test_index).sum().item()
        total_acc_test += acc
    print(
            f'| Test Loss: {total_loss_test / len(test_dataset): .3f} \
            | Test Accuracy: {total_acc_test / len(test_dataset): .3f} '
        )

    train_loss_list = torch.tensor(train_loss_list,device='cpu')
    train_loss_list = train_loss_list / len(train_data)
    train_acc_list = torch.tensor(train_acc_list,device='cpu')
    train_acc_list = train_acc_list / len(train_data)

    eval_loss_list = torch.tensor(eval_loss_list,device='cpu')
    eval_loss_list = eval_loss_list / len(eval_data)
    eval_acc_list = torch.tensor(eval_acc_list,device='cpu')
    eval_acc_list = eval_acc_list / len(eval_data)

    return train_loss_list, train_acc_list, eval_loss_list, eval_acc_list, total_loss_test / len(test_dataset), total_acc_test / len(test_dataset), model.state_dict()


In [None]:
def gen_embeded_labels(backbone, tokenizer, form_name, label_list):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    backbone.to(device)
    embeded_labels = {}
    for label in label_list:
        embeded_label = tokenizer(
                label,
                max_length=512,
                truncation=True,
                padding='max_length',
                return_tensors='pt').to(device)
        embeded_label = backbone(**embeded_label)
        embeded_label = embeded_label.last_hidden_state[:,0,:]
        embeded_labels[label] = embeded_label.tolist()
    embeded_labels = json.dumps(embeded_labels, ensure_ascii=False)

    with open(f"../../dataset/{form_name}/sim_embeded_labels.json", "w", encoding='utf-8') as f:
        f.write(embeded_labels)

加载参数

In [None]:
torch.cuda.empty_cache()

backbone_name = 'bert-base-multilingual-cased'
tokenizer_name = 'bert-base-multilingual-cased'
epochs = 40
batch_size = 24
learning_rate = 1e-2

backbone = BertModel.from_pretrained(backbone_name)
tokenizer = BertTokenizer.from_pretrained(tokenizer_name)

form_name = "factb"
with open(f"../../dataset/{form_name}/label_list.json", "rb") as f:
    label_list = json.load(f)
gen_embeded_labels(backbone, tokenizer, form_name, list(label_list))   

model = BertSim(backbone, tokenizer)

milestones = [25,35]
gamma = 0.5
optimizer = SGD(model.parameters(), lr=learning_rate)
schedualer = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=gamma)

In [None]:
with open(f'../../dataset/{form_name}/dataset_prompt.json', 'rb') as file:
    dataset = json.load(file)

train_data = dataset["train"]
eval_data = dataset["eval"]
test_data = dataset["test"]

开始训练

In [None]:
train_loss_list, train_acc_list, eval_loss_list, eval_acc_list, test_loss, test_acc, model_state = train(
    model = model,
    tokenizer = tokenizer,
    form_name=form_name,
    train_data = train_data,
    eval_data = eval_data,
    test_data = test_data,
    epochs = epochs,
    batch_size = batch_size,
    optimizer = optimizer,
    schedualer = schedualer,
    model_dict_path = "")

存储模型、训练数据，绘制Loss、ACC变化曲线

In [None]:
torch.save(model_state, f"../../model_dict/sim_{form_name}_{backbone_name}_e_{epochs}_lr_{learning_rate}_sche_{milestones}_{gamma}_bs_{batch_size}.pt")

fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(20,20))

scale = 1
train_steps = np.ones(1)
for i in range(len(milestones)):
    train_steps = np.concatenate((train_steps, np.arange(train_steps[-1] + scale, train_steps[-1] + scale*milestones[i], scale)), axis=0)
    scale = scale * gamma ** (i+1)
train_steps = np.concatenate((train_steps, np.arange(train_steps[-1] + scale, train_steps[-1] + scale*(epochs-milestones[-1]), scale)), axis=0)

axes[0,0].plot(np.arange(1, epochs+1, 1), train_loss_list)
axes[0,0].set_title('Loss-Epoch Curve(Train)')

axes[0,1].plot(np.arange(1, epochs+1, 1), train_acc_list)
axes[0,1].set_title('Acc-Epoch Curve(Train)')

axes[1,0].plot(np.arange(1, epochs+1, 1), eval_loss_list)
axes[1,0].set_title('Loss-Epoch Curve(Eval)')

axes[1,1].plot(np.arange(1, epochs+1, 1), eval_acc_list)
axes[1,1].set_title('Acc-Epoch Curve(Eval)')

plt.show()
plt.savefig(f'../../loss_acc_his/sim_{form_name}_{backbone_name}_e_{epochs}_lr_{learning_rate}_sche_{milestones}_{gamma}_bs_{batch_size}.png')

In [None]:
train_result = {
    "train_loss" : train_loss_list.tolist(),
    "train_acc" : train_acc_list.tolist(),
    "eval_loss" : eval_loss_list.tolist(),
    "eval_acc" : eval_acc_list.tolist(),
    "test_loss" : test_loss.item(),
    "test_acc" : test_acc,
    "train_steps": train_steps.tolist()
}
train_result_str = json.dumps(train_result)

with open(f'../../loss_acc_his/sim_{form_name}_{backbone_name}_e_{epochs}_lr_{learning_rate}_sche_{milestones}_{gamma}_bs_{batch_size}.json', 'w') as file:
  file.write(train_result_str)

加载、测试模型

In [None]:
class TestDataset(Dataset):
    def __init__(self, dataset, form_name):
        with open(f"../../dataset/{form_name}/sim_embeded_labels.json", 'r', encoding='utf-8') as file:
            embeded_labels = json.load(file)

        self.input = []
        self.index = []
        self.label_list = []
        self.embeded_label_list = []

        embeded_label_max_len = 0
        for data in dataset:
            embeded_label_max_len = max(embeded_label_max_len, len(data['labels']))

        for data in dataset:
            if len(data['input'])>510:
                data['input'] = data['input'][-510:]
            self.input.append(data['input'])

            self.index.append(data['index'])

            embeded_label_list = torch.zeros((embeded_label_max_len, 768))
            for i, label in enumerate(data['labels']):
                embeded_label_list[i] = torch.tensor(embeded_labels[label])
            self.embeded_label_list.append(embeded_label_list)

            self.label_list.append(data['labels'])

    def __len__(self):
        assert len(self.index) == len(self.input)
        return len(self.index)

    def __getitem__(self, index):
        return self.input[index], self.index[index], self.embeded_label_list[index], self.label_list[index]

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, f1_score, classification_report
import numpy as np

def basic_analysis(y_true, y_pred):
    # 精确度（Accuracy）
    accuracy = accuracy_score(y_true, y_pred)
    print(f'Accuracy: {accuracy}')

    # 混淆矩阵（Confusion Matrix）
    conf_matrix = confusion_matrix(y_true, y_pred)
    print(f'Confusion Matrix:\n {conf_matrix}')

    # 精确度（Precision）
    precision_macro = precision_score(y_true, y_pred, average='macro')
    precision_micro = precision_score(y_true, y_pred, average='micro')
    print(f'Macro-average Precision: {precision_macro}')
    print(f'Micro-average Precision: {precision_micro}')

    # 召回率（Recall）
    recall_macro = recall_score(y_true, y_pred, average='macro')
    recall_micro = recall_score(y_true, y_pred, average='micro')
    print(f'Macro-average Recall: {recall_macro}')
    print(f'Micro-average Recall: {recall_micro}')

    # F1分数（F1-Score）
    f1_macro = f1_score(y_true, y_pred, average='macro')
    f1_micro = f1_score(y_true, y_pred, average='micro')
    print(f'Macro-average F1-Score: {f1_macro}')
    print(f'Micro-average F1-Score: {f1_micro}')

    # 完整的分类报告（包含上述所有指标）
    print(f'Classification Report:\n {classification_report(y_true, y_pred)}')

    result = {
        "y_true": y_true,
        "y_pred": y_pred,
        "Accuracy": accuracy.tolist(),
        "Confusion Matrix": f"{conf_matrix}",
        "Macro-average Precision": precision_macro.tolist(),
        "Micro-average Precision": precision_micro.tolist(),
        "Macro-average Recall": recall_macro.tolist(),
        "Micro-average Recall": recall_micro.tolist(),
        "Macro-average F1-Score": f1_macro.tolist(),
        "Micro-average F1-Score": f1_micro.tolist(),
        "Classification Report": f"{classification_report(y_true, y_pred)}"
    }

    result = json.dumps(result)
    stored_path = f"../../test_report/{form_name}/prompt/sim.json"
    with open(stored_path, "w") as f:
        f.write(result)
    print(f"Test report is stored in {stored_path}")

In [None]:
def test_model(backbone, tokenizer, form_name, test_data, batch_size, label_list, model_dict_path=""):
    if model_dict_path == "":
        return ""
    
    test_dataset = TestDataset(test_data, form_name)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = BertSim(backbone, tokenizer)
    model.load_state_dict(torch.load(model_dict_path, map_location=device))

    model.to(device)
    criterion.to(device)

    total_loss_test = 0
    total_acc_test = 0

    real_index_list = []
    pred_index_list = []

    model.eval()
    for test_input, test_index, test_labels, labels in tqdm(test_dataloader):
        test_input = tokenizer(
                test_input,
                max_length=512,
                truncation=True,
                padding='max_length',
                return_tensors='pt').to(device)
        test_index = test_index.to(device)
        test_labels = test_labels.to(device)
        with torch.no_grad():
            pred = model(test_input, test_labels)

        bacth_loss = criterion(pred, test_index)
        total_loss_test += bacth_loss

        acc = (pred.argmax(dim=1)==test_index).sum().item()
        total_acc_test += acc

        real_index_list.extend([label_list.index(labels[j][i]) for i,j in enumerate(test_index)])
        pred_index_list.extend([label_list.index(labels[j][i]) for i,j in enumerate(pred.argmax(dim=1))])

    print(
            f'| Test Loss: {total_loss_test / len(test_dataset): .3f} \
            | Test Accuracy: {total_acc_test / len(test_dataset): .3f} '
        )
    print(real_index_list)
    print(pred_index_list)
    basic_analysis(real_index_list, pred_index_list)
    

In [None]:
model_name = f"sim_{form_name}_{backbone_name}_e_{epochs}_lr_{learning_rate}_sche_{milestones}_{gamma}_bs_{batch_size}"
backbone_name = 'bert-base-multilingual-cased'
tokenizer_name = 'bert-base-multilingual-cased'
backbone = BertModel.from_pretrained(backbone_name)
tokenizer = BertTokenizer.from_pretrained(tokenizer_name)

test_model(backbone, tokenizer, form_name, test_data, 1, label_list, f"../../model_dict/{model_name}.pt")