In [30]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import random
import torch

# 设置随机种子

In [None]:
def seed_everything(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    
seed_everything(12)

# **1. 准备数据**
## **构建数据集**
与之前一样，我们首先编写继承自 `Dataset` 类的自定义数据集用于组织样本和标签。

In [31]:
from torch.utils.data import Dataset
from datasets import load_from_disk

class ChnSentiCorp(Dataset):
    def __init__(self, split):
        self.dataset = load_from_disk('./data/ChnSentiCorp')[split]
    
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.dataset[idx]

train_data = ChnSentiCorp('train')
valid_data = ChnSentiCorp('validation')
test_data = ChnSentiCorp('test')

In [32]:
train_data[20]

{'text': '非常不错，服务很好，位于市中心区，交通方便，不过价格也高！', 'label': 1}

下面我们输出数据集的尺寸，并且打印出一个训练样本：

In [14]:
print(f'train set size: {len(train_data)}')
print(f'valid set size: {len(valid_data)}')
print(f'test set size: {len(test_data)}')
print(next(iter(train_data)))

train set size: 9600
valid set size: 0
test set size: 1200
{'text': '选择珠江花园的原因就是方便，有电动扶梯直接到达海边，周围餐馆、食廊、商场、超市、摊位一应俱全。酒店装修一般，但还算整洁。 泳池在大堂的屋顶，因此很小，不过女儿倒是喜欢。 包的早餐是西式的，还算丰富。 服务吗，一般', 'label': 1}


下面我们首先编写模板和 verbalizer 对应的函数：

In [17]:
from transformers import AutoTokenizer

checkpoint = "bert-base-chinese"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

def get_prompt(x):
#prompt = f'总体上来说很[MASK]。{x}'
    prompt = f'[unused1][unused2][unused3][unused4][MASK][unused5][unused6][unused7][unused8]。{x}'
    return {
        'prompt': prompt, 
        'mask_offset': prompt.find('[MASK]')
    }

def get_verbalizer(tokenizer):
    return {
        'pos': {'token': '好', 'id': tokenizer.convert_tokens_to_ids("好")}, 
        'neg': {'token': '差', 'id': tokenizer.convert_tokens_to_ids("差")}
    }

例如，第一个样本转换后的模板为：

In [28]:
from transformers import AutoTokenizer

checkpoint = "bert-base-chinese"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

comment = '这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'

print('verbalizer:', get_verbalizer(tokenizer))

prompt_data = get_prompt(comment)
prompt, mask_offset = prompt_data['prompt'], prompt_data['mask_offset']

encoding = tokenizer(prompt, truncation=True)
tokens = encoding.tokens()
mask_idx = encoding.char_to_token(mask_offset)

print('prompt:', prompt)
print('prompt tokens:', tokens)
print('mask idx:', mask_idx)

verbalizer: {'pos': {'token': '好', 'id': 1962}, 'neg': {'token': '差', 'id': 2345}}
prompt: [unused1][unused2][unused3][unused4][MASK][unused5][unused6][unused7][unused8]。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。
prompt tokens: ['[CLS]', '[', 'u', '##nus', '##ed', '##1', ']', '[', 'u', '##nus', '##ed', '##2', ']', '[', 'u', '##nus', '##ed', '##3', ']', '[', 'u', '##nus', '##ed', '##4', ']', '[MASK]', '[', 'u', '##nus', '##ed', '##5', ']', '[', 'u', '##nus', '##ed', '##6', ']', '[', 'u', '##nus', '##ed', '##7', ']', '[', 'u', '##nus', '##ed', '##8', ']', '。', '这', '个', '宾', '馆', '比', '较', '陈', '旧', '了', '，', '特', '价', '的', '房', '间', '也', '很', '一', '般', '。', '总', '体', '来', '说', '一', '般', '。', '[SEP]']
mask idx: 25


这里我们可以为“积极”和“消极”构建专门的虚拟 token “[POS]”和“[NEG]”，并且设置对应的类别描述为“好的、优秀的、正面的评价、积极的态度”和“差的、糟糕的、负面的评价、消极的态度”。下面我们扩展一下上面的 `verbalizer` mmm函数，添加一个 `vtype` 参数来区分两种 `verbalizer` 类型：m

In [29]:
def get_verbalizer(tokenizer, vtype):
    assert vtype in ['base', 'virtual']
    return {
        'pos': {'token': '好', 'id': tokenizer.convert_tokens_to_ids("好")}, 
        'neg': {'token': '差', 'id': tokenizer.convert_tokens_to_ids("差")}
    } if vtype == 'base' else {
        'pos': {
            'token': '[POS]', 'id': tokenizer.convert_tokens_to_ids("[POS]"), 
            'description': '好的、优秀的、正面的评价、积极的态度'
        }, 
        'neg': {
            'token': '[NEG]', 'id': tokenizer.convert_tokens_to_ids("[NEG]"), 
            'description': '差的、糟糕的、负面的评价、消极的态度'
        }
    }

vtype = 'virtual'
# add label words
if vtype == 'virtual':
    tokenizer.add_special_tokens({'additional_special_tokens': ['[POS]', '[NEG]']})
print('verbalizer:', get_verbalizer(tokenizer, vtype=vtype))

verbalizer: {'pos': {'token': '[POS]', 'id': 21128, 'description': '好的、优秀的、正面的评价、积极的态度'}, 'neg': {'token': '[NEG]', 'id': 21129, 'description': '差的、糟糕的、负面的评价、消极的态度'}}


Prompting 方法实际输入的是转换后的模板，而不是原始文本，因此我们首先使用模板函数 `get_prompt()` 来更新数据集：

In [73]:
def f(data):
    data['comment'] = data['text']
    prompt_data = get_prompt(data['text'])
    data['prompt'] = prompt_data['prompt']
    data['mask_offset'] = prompt_data['mask_offset']
    return data
    
class ChnSentiCorp(Dataset):
    def __init__(self, split):
        self.data = load_from_disk('./data/ChnSentiCorp')[split]
        self.data = self.data.map(f)
        self.data = self.data.remove_columns(['text'])
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    
train_data = ChnSentiCorp('train')
valid_data = ChnSentiCorp('validation')
test_data = ChnSentiCorp('test')

In [74]:
train_data.data

Dataset({
    features: ['label', 'comment', 'prompt', 'mask_offset'],
    num_rows: 9600
})

In [75]:
train_data.data[10]

{'label': 0,
 'comment': '我看过朋友的还可以，但是我订的书迟迟未到已有半个月，都没有收到打电话也没有用，以后你们订书一定要考虑好！当当实在是太慢了',
 'prompt': '[unused1][unused2][unused3][unused4][MASK][unused5][unused6][unused7][unused8]。我看过朋友的还可以，但是我订的书迟迟未到已有半个月，都没有收到打电话也没有用，以后你们订书一定要考虑好！当当实在是太慢了',
 'mask_offset': 36}

同样地，我们通过 `print(next(iter(train_data)))` 打印出一个训练样本：

In [79]:
print(next(iter(train_data)))

{'label': 1, 'comment': '选择珠江花园的原因就是方便，有电动扶梯直接到达海边，周围餐馆、食廊、商场、超市、摊位一应俱全。酒店装修一般，但还算整洁。 泳池在大堂的屋顶，因此很小，不过女儿倒是喜欢。 包的早餐是西式的，还算丰富。 服务吗，一般', 'prompt': '[unused1][unused2][unused3][unused4][MASK][unused5][unused6][unused7][unused8]。选择珠江花园的原因就是方便，有电动扶梯直接到达海边，周围餐馆、食廊、商场、超市、摊位一应俱全。酒店装修一般，但还算整洁。 泳池在大堂的屋顶，因此很小，不过女儿倒是喜欢。 包的早餐是西式的，还算丰富。 服务吗，一般', 'mask_offset': 36}


## **数据预处理**

与之前一样，接下来我们就通过 `DataLoader` 库来按批(batch)加载数据，将文本转换为模型可以接受的 token IDs。

In [80]:
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

vtype = 'base'
# vtype = 'virtual'
max_length = 512
batch_size = 8
checkpoint = "bert-base-chinese"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
if vtype == 'virtual':
    tokenizer.add_special_tokens({'additional_special_tokens': ['[POS]', '[NEG]']})

verbalizer = get_verbalizer(tokenizer, vtype=vtype)
pos_id, neg_id = verbalizer['pos']['id'], verbalizer['neg']['id']

def collote_fn(batch_samples):
    batch_sentences, batch_mask_idxs, batch_labels  = [], [], []
    for sample in batch_samples:
        batch_sentences.append(sample['prompt'])
        encoding = tokenizer(sample['prompt'], truncation=True)
        mask_idx = encoding.char_to_token(sample['mask_offset'])
        assert mask_idx is not None
        batch_mask_idxs.append(mask_idx)
        batch_labels.append(int(sample['label']))
    batch_inputs = tokenizer(
        batch_sentences, 
        max_length=max_length, 
        padding=True, 
        truncation=True, 
        return_tensors="pt"
    )
    label_word_id = [neg_id, pos_id]
    return {
        'batch_inputs': batch_inputs, 
        'batch_mask_idxs': batch_mask_idxs, 
        'label_word_id': label_word_id, 
        'labels': batch_labels
    }

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collote_fn)
valid_dataloader = DataLoader(valid_data, batch_size=batch_size, shuffle=False, collate_fn=collote_fn)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collote_fn)

batch_data = next(iter(train_dataloader))
print('batch_X shape:', {k: v.shape for k, v in batch_data['batch_inputs'].items()})
print(batch_data['batch_inputs'])
print(batch_data['batch_mask_idxs'])
print(batch_data['label_word_id'])
print(batch_data['labels'])

batch_X shape: {'input_ids': torch.Size([8, 326]), 'token_type_ids': torch.Size([8, 326]), 'attention_mask': torch.Size([8, 326])}
{'input_ids': tensor([[101, 138, 163,  ...,   0,   0,   0],
        [101, 138, 163,  ...,   0,   0,   0],
        [101, 138, 163,  ...,   0,   0,   0],
        ...,
        [101, 138, 163,  ...,   0,   0,   0],
        [101, 138, 163,  ...,   0,   0,   0],
        [101, 138, 163,  ...,   0,   0,   0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])}
[25, 25, 25, 25, 25, 25, 25, 25]
[2345, 1962]
[1, 0, 0, 0, 0, 1, 1, 0]


# **2. 训练模型**

## **构建模型**

对于 MLM 任务，可以直接使用 Transformers 库封装好的 `AutoModelForMaskedLM` 类。由于 BERT 已经在 MLM 任务上进行了预训练，因此借助模板我们甚至可以在不微调的情况下 (Zero-shot) 直接使用模型来预测情感极性。例如对我们的第一个样本：

In [81]:
from transformers import AutoModelForMaskedLM

checkpoint = "bert-base-chinese"
model = AutoModelForMaskedLM.from_pretrained(checkpoint)

text = "总体上来说很[MASK]。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。"
inputs = tokenizer(text, return_tensors="pt")
token_logits = model(**inputs).logits

Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [100]:
# Find the location of [MASK] and extract its logits
mask_token_index = torch.where(inputs["input_ids"] == tokenizer.mask_token_id)[1]
mask_token_logits = token_logits[0, mask_token_index, :]
# Pick the [MASK] candidates with the highest logits
top_5_tokens = torch.topk(mask_token_logits, 5, dim=1).indices[0].tolist()

for token in top_5_tokens:
    print(f"'>>> {text.replace(tokenizer.mask_token, tokenizer.decode([token]))}'")

'>>> 总体上来说很好。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'
'>>> 总体上来说很棒。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'
'>>> 总体上来说很差。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'
'>>> 总体上来说很般。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'
'>>> 总体上来说很赞。这个宾馆比较陈旧了，特价的房间也很一般。总体来说一般。'


当然，这种方式不够灵活，因此像之前章节中一样，本文采用继承 Transformers 库预训练模型的方式来手工构建模型：

In [101]:
from torch import nn
from transformers.activations import ACT2FN
from transformers import AutoConfig
from transformers import BertPreTrainedModel, BertModel

def batched_index_select(input, dim, index):
    for i in range(1, len(input.shape)):
        if i != dim:
            index = index.unsqueeze(i)
    expanse = list(input.shape)
    expanse[0] = -1
    expanse[dim] = -1
    index = index.expand(expanse)
    return torch.gather(input, dim, index)

class BertPredictionHeadTransform(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        if isinstance(config.hidden_act, str):
            self.transform_act_fn = ACT2FN[config.hidden_act]
        else:
            self.transform_act_fn = config.hidden_act
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states)
        hidden_states = self.transform_act_fn(hidden_states)
        hidden_states = self.LayerNorm(hidden_states)
        return hidden_states

class BertLMPredictionHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.transform = BertPredictionHeadTransform(config)
        self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
        self.bias = nn.Parameter(torch.zeros(config.vocab_size))
        self.decoder.bias = self.bias

    def forward(self, hidden_states):
        hidden_states = self.transform(hidden_states)
        hidden_states = self.decoder(hidden_states)
        return hidden_states

class BertOnlyMLMHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.predictions = BertLMPredictionHead(config)

    def forward(self, sequence_output: torch.Tensor) -> torch.Tensor:
        prediction_scores = self.predictions(sequence_output)
        return prediction_scores

class BertForPrompt(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.bert = BertModel(config, add_pooling_layer=False)
        self.cls = BertOnlyMLMHead(config)
        # Initialize weights and apply final processing
        self.post_init()
    
    def get_output_embeddings(self):
        return self.cls.predictions.decoder

    def set_output_embeddings(self, new_embeddings):
        self.cls.predictions.decoder = new_embeddings
    
    def forward(self, batch_inputs, batch_mask_idxs, label_word_id, labels=None):
        bert_output = self.bert(**batch_inputs)
        sequence_output = bert_output.last_hidden_state
        batch_mask_reps = batched_index_select(sequence_output, 1, batch_mask_idxs.unsqueeze(-1)).squeeze(1)
        pred_scores = self.cls(batch_mask_reps)[:, label_word_id]

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(pred_scores, labels)
        return loss, pred_scores

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')
config = AutoConfig.from_pretrained(checkpoint)
model = BertForPrompt.from_pretrained(checkpoint, config=config).to(device)
if vtype == 'virtual':
    model.resize_token_embeddings(len(tokenizer))
    print(f"initialize embeddings of {verbalizer['pos']['token']} and {verbalizer['neg']['token']}")
    with torch.no_grad():
        pos_tokenized = tokenizer(verbalizer['pos']['description'])
        pos_tokenized_ids = tokenizer.convert_tokens_to_ids(pos_tokenized)
        neg_tokenized = tokenizer(verbalizer['neg']['description'])
        neg_tokenized_ids = tokenizer.convert_tokens_to_ids(neg_tokenized)
        new_embedding = model.bert.embeddings.word_embeddings.weight[pos_tokenized_ids].mean(axis=0)
        model.bert.embeddings.word_embeddings.weight[pos_id, :] = new_embedding.clone().detach().requires_grad_(True)
        new_embedding = model.bert.embeddings.word_embeddings.weight[neg_tokenized_ids].mean(axis=0)
        model.bert.embeddings.word_embeddings.weight[neg_id, :] = new_embedding.clone().detach().requires_grad_(True)

print(model)

Using cpu device
BertForPrompt(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(21128, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12

为了测试模型的操作是否符合预期，我们尝试将一个 batch 的数据送入模型：

In [103]:
def to_device(batch_data):
    new_batch_data = {}
    for k, v in batch_data.items():
        if k == 'batch_inputs':
            new_batch_data[k] = {
                k_: v_.to(device) for k_, v_ in v.items()
            }
        elif k == 'label_word_id':
            new_batch_data[k] = v
        else:
            new_batch_data[k] = torch.tensor(v).to(device)
    return new_batch_data

batch_data = next(iter(train_dataloader))
batch_data = to_device(batch_data)
_, outputs = model(**batch_data)
print(outputs.shape)

torch.Size([8, 2])


## **优化模型参数**

我们将每一轮 Epoch 分为“训练循环”和“验证/测试循环”，在训练循环中计算损失、优化模型参数，在验证/测试循环中评估模型性能。下面我们首先实现训练循环。

因为对标签词的预测实际上就是对类别的预测，损失是通过在类别预测和答案标签之间计算交叉熵：

In [104]:
from tqdm.auto import tqdm

def train_loop(dataloader, model, optimizer, lr_scheduler, epoch, total_loss):
    progress_bar = tqdm(range(len(dataloader)))
    progress_bar.set_description(f'loss: {0:>7f}')
    finish_step_num = epoch * len(dataloader)
    
    model.train()
    for step, batch_data in enumerate(dataloader, start=1):
        batch_data = to_device(batch_data)
        outputs = model(**batch_data)
        loss = outputs[0]

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        lr_scheduler.step()

        total_loss += loss.item()
        progress_bar.set_description(f'loss: {total_loss/(finish_step_num + step):>7f}')
        progress_bar.update(1)
    return total_loss

借助机器学习包 sklearn 提供的 `classification_report` 函数来输出这些指标，例如：

In [105]:
from sklearn.metrics import classification_report

y_true = [1, 1, 0, 1, 2, 1, 0, 2, 1, 1, 0, 1, 0]
y_pred = [1, 0, 0, 1, 2, 0, 1, 1, 1, 0, 0, 1, 0]

print(classification_report(y_true, y_pred, output_dict=False))

              precision    recall  f1-score   support

           0       0.50      0.75      0.60         4
           1       0.67      0.57      0.62         7
           2       1.00      0.50      0.67         2

    accuracy                           0.62        13
   macro avg       0.72      0.61      0.63        13
weighted avg       0.67      0.62      0.62        13



因此在验证/测试循环中，我们只需要汇总模型对所有样本的预测结果和答案标签，然后送入到 `classification_report` 中计算各项分类指标：

In [106]:
from sklearn.metrics import classification_report

def test_loop(dataloader, model):
    true_labels, predictions = [], []
    model.eval()
    with torch.no_grad():
        for batch_data in tqdm(dataloader):
            true_labels += batch_data['labels']
            batch_data = to_device(batch_data)
            outputs = model(**batch_data)
            pred = outputs[1]
            predictions += pred.argmax(dim=-1).cpu().numpy().tolist()
    metrics_text = classification_report(true_labels, predictions, target_names=['NEG', 'POS'], digits=4)
    metrics_dict = classification_report(true_labels, predictions, output_dict=True)
    print(metrics_text)
    return metrics_dict

在开始训练之前，我们先评估一下没有微调的 BERT 模型在测试集上的性能。

In [107]:
# test_data = ChnSentiCorp('/kaggle/input/chnsenticorp-alllabeled/test.txt')
# test_dataloader = DataLoader(test_data, batch_size=4, shuffle=False, collate_fn=collote_fn)

# test_loop(test_dataloader, model)

## **训练&保存模型**

我们会根据模型在验证集上的性能来调整超参数以及选出最好的模型权重，然后将选出的模型应用于测试集以评估最终的性能。这里我们继续使用 AdamW 优化器，并且通过 `get_scheduler()` 函数定义学习率调度器：

In [None]:
from transformers import AdamW, get_scheduler

learning_rate = 1e-5
epoch_num = 5

optimizer = AdamW(model.parameters(), lr=learning_rate)
lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=epoch_num*len(train_dataloader),
)

total_loss = 0.
best_f1_score = 0.
for epoch in range(epoch_num):
    print(f"Epoch {epoch+1}/{epoch_num}\n" + 30 * "-")
    total_loss = train_loop(train_dataloader, model, optimizer, lr_scheduler, epoch, total_loss)
    valid_scores = test_loop(valid_dataloader, model)
    macro_f1, micro_f1 = valid_scores['macro avg']['f1-score'], valid_scores['weighted avg']['f1-score']
    f1_score = (macro_f1 + micro_f1) / 2
    if f1_score > best_f1_score:
        best_f1_score = f1_score
        print('saving new weights...\n')
        torch.save(
            model.state_dict(), 
            f'epoch_{epoch+1}_valid_macrof1_{(macro_f1*100):0.3f}_microf1_{(micro_f1*100):0.3f}_model_weights.bin'
        )
print("Done!")

Epoch 1/5
------------------------------




  0%|          | 0/1200 [00:00<?, ?it/s]

查看保存的权重信息

In [None]:
import os
best_weight_path = ''
file_list = []

# 遍历目录下的所有文件
for dirname, _, filenames in os.walk('/kaggle/working'):
    for filename in filenames:
        # 构建文件的完整路径
        file_path = os.path.join(dirname, filename)
        
        # 获取文件的最后修改时间
        file_mtime = os.path.getmtime(file_path)
        
        # 将文件路径和最后修改时间添加到列表
        file_list.append((file_path, file_mtime))

# 按最后修改时间对文件列表进行排序
sorted_files = sorted(file_list, key=lambda x: x[1])

# 输出排序后的文件列表
for file_path, file_mtime in sorted_files:
    print(f"{file_path} - Last Modified Time: {file_mtime}")
    
best_weight_path = sorted_files[-1][0]

print('best_weight_path :', best_weight_path)

In [None]:
m

# **3. 测试模型**

训练完成后，我们加载在验证集上性能最优的模型权重，汇报其在测试集上的性能。

In [None]:
import json

model.load_state_dict(torch.load(best_weight_path))

model.eval()
with torch.no_grad():
    print('evaluating on test set...')
    true_labels, predictions, probs = [], [], []
    for batch_data in tqdm(test_dataloader):
        true_labels += batch_data['labels']
        batch_data = to_device(batch_data)
        outputs = model(**batch_data)
        pred = outputs[1]
        predictions += pred.argmax(dim=-1).cpu().numpy().tolist()
        probs += torch.nn.functional.softmax(pred, dim=-1)
    save_resluts = []
    for s_idx in tqdm(range(len(test_data))):
        save_resluts.append({
            "comment": test_data[s_idx]['comment'], 
            "label": true_labels[s_idx], 
            "pred": predictions[s_idx], 
            "prob": {'neg': probs[s_idx][0].item(), 'pos': probs[s_idx][1].item()}
        })
        
    metrics_text = classification_report(true_labels, predictions, target_names=['NEG', 'POS'], digits=4)
    metrics_dict = classification_report(true_labels, predictions, output_dict=True)
    print(metrics_text)
#     metrics = classification_report(true_labels, predictions, output_dict=True)
#     pos_p, pos_r, pos_f1 = metrics['1']['precision'], metrics['1']['recall'], metrics['1']['f1-score']
#     neg_p, neg_r, neg_f1 = metrics['0']['precision'], metrics['0']['recall'], metrics['0']['f1-score']
#     macro_f1, micro_f1 = metrics['macro avg']['f1-score'], metrics['weighted avg']['f1-score']
#     print(f"pos: {pos_p*100:>0.2f} / {pos_r*100:>0.2f} / {pos_f1*100:>0.2f}, neg: {neg_p*100:>0.2f} / {neg_r*100:>0.2f} / {neg_f1*100:>0.2f}")
#     print(f"Macro-F1: {macro_f1*100:>0.2f} Micro-F1: {micro_f1*100:>0.2f}\n")
    print('saving predicted results...')
    with open('test_data_pred.json', 'wt', encoding='utf-8') as f:
        for example_result in save_resluts:
            f.write(json.dumps(example_result, ensure_ascii=False) + '\n')

我们打开保存预测结果的 `test_data_pred.json`，其中每一行对应一个样本，`comment` 对应评论，`label` 对应标注标签，`pred` 对应预测出的标签，`prediction` 对应具体预测出的概率值。

In [None]:
# 打开文件并加载JSON数据
with open('test_data_pred.json', 'rt', encoding='utf-8') as file:
    # 一次性读取所有行，并解析为JSON对象列表
    json_data_list = [json.loads(line.strip()) for line in file]

# 打印读取的JSON数据前五条
for index, example_result in enumerate(json_data_list[:5], start=1):
    json_str = json.dumps(example_result, ensure_ascii=False, indent=4)
    print(json_str)

# **4. 封装预测函数**

我们训练模型的目的是为了能够给其他人提供服务。尤其对于不熟悉深度学习的普通开发者而言，需要的只是一个能够完成特定任务的接口。因此在大多数情况下，我们都应该将模型的预测过程封装为一个端到端 (End-to-End) 的函数：输入文本，输出结果：

In [None]:
def predict(model, tokenizer, comment, verbalizer):
    prompt_data = get_prompt(comment)
    prompt = prompt_data['prompt']
    encoding = tokenizer(prompt, truncation=True)
    mask_idx = encoding.char_to_token(prompt_data['mask_offset'])
    assert mask_idx is not None
    inputs = tokenizer(
        prompt, 
        max_length=max_length, 
        padding=True, 
        truncation=True, 
        return_tensors="pt"
    )
    inputs = {
        'batch_inputs': inputs, 
        'batch_mask_idxs': [mask_idx], 
        'label_word_id': [verbalizer['neg']['id'], verbalizer['pos']['id']] 
    }
    inputs = to_device(inputs)
    model.eval()
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs[1]
        prob = torch.nn.functional.softmax(logits, dim=-1)
    pred = logits.argmax(dim=-1)[0].item()
    prob = prob[0][pred].item()
    return pred, prob

下面我们尝试输出模型对测试集前 5 条数据的预测结果：

In [None]:
model.load_state_dict(torch.load(best_weight_path))

for i in range(5):
    data = test_data[i]
    pred, prob = predict(model, tokenizer, data['comment'], verbalizer)
    print(f"{data['comment']}\nlabel: {data['label']}\tpred: {pred}\tprob: {prob}")