# 训练一个向量编码器

In [1]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification,TrainingArguments,Trainer,DataCollatorWithPadding
from datasets import load_dataset
import os
os.environ['CUDA_VISIBLE_DEVICES']='0'

# 1. 加载数据

In [2]:
datas = load_dataset("shibing624/sts-sohu2021",'dda')
# datas.save_to_disk("./data/sts-sohu2021/dda")

In [3]:
# 数据预览
datas,datas['train'][0]

(DatasetDict({
     train: Dataset({
         features: ['sentence1', 'sentence2', 'label'],
         num_rows: 10512
     })
     test: Dataset({
         features: ['sentence1', 'sentence2', 'label'],
         num_rows: 1000
     })
 }),
 {'sentence1': '高晓松瘦身成功减肥30斤，被路人认成吴亦凡！',
  'sentence2': '景甜亮因穿着成“焦点”，看到最后一张，网友：美的有点过分了！',
  'label': 0})

In [4]:
# 定义数据处理函数
import torch
tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")

def process_fun(examples):
    sentences = []
    labels = []
    for sen1, sen2, label in zip(examples["sentence1"], examples["sentence2"], examples["label"]):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else -1)
    # input_ids, attention_mask, token_type_ids
    tokenized_examples = tokenizer(sentences, max_length=128, truncation=True, padding="max_length")
    tokenized_examples = {k: [v[i: i + 2] for i in range(0, len(v), 2)] for k, v in tokenized_examples.items()}
    tokenized_examples["labels"] = labels
    return tokenized_examples

In [5]:
data_tokenizer = datas.map(process_fun,batched=True,remove_columns=datas["train"].column_names)

Map:   0%|          | 0/10512 [00:00<?, ? examples/s]

Map:   0%|          | 0/1000 [00:00<?, ? examples/s]

# 2. 加载模型


In [6]:
from transformers import BertForSequenceClassification,BertModel
# 导入余弦函数
from torch.nn import CosineSimilarity,CosineEmbeddingLoss

from typing import Optional
import torch
class SentenceEncoderModel(BertForSequenceClassification):
    def __init__(self, config):
            super().__init__(config)
            self.num_labels = config.num_labels
            self.config = config
            self.bert = BertModel(config)
            # Initialize weights and apply final processing
            self.post_init()
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        # 获取sentenceA 和 sentenceB的输入
        senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
        senA_attention_mask, senB_attention_mask = attention_mask[:, 0], attention_mask[:, 1]
        senA_token_type_ids, senB_token_type_ids = token_type_ids[:, 0], token_type_ids[:, 1]

        # 分别获取sentenceA 和 sentenceB的向量表示
        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senA_pooled_output = senA_outputs[1]    # [batch, hidden]

        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senB_pooled_output = senB_outputs[1]    # [batch, hidden]

        # 计算相似度

        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)    # [batch, ]

        # 计算loss
        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)

        output = (cos,)
        return ((loss,) + output) if loss is not None else output

In [7]:
model = SentenceEncoderModel.from_pretrained("hfl/chinese-macbert-base", num_labels=2)

Some weights of SentenceEncoderModel were not initialized from the model checkpoint at /data1/model/chinese-macbert-base and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## 3.创建评估函数

In [8]:
import evaluate

acc_metric = evaluate.load("accuracy")
f1_metirc = evaluate.load("f1")

def eval_metric(eval_predict):
    predictions, labels = eval_predict
    # 这里需要一个置信度，代表概率大于0.7的我们就认为i相似
    predictions = [int(p > 0.7) for p in predictions]
    labels = [int(l > 0) for l in labels]
    # predictions = predictions.argmax(axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metirc.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

## 4. 设置训练参数

In [9]:
train_args = TrainingArguments(output_dir="./encoder_model",      # 输出文件夹
                               per_device_train_batch_size=32,  # 训练时的batch_size
                               per_device_eval_batch_size=32,  # 验证时的batch_size
                               logging_steps=10,                # log 打印的频率
                               evaluation_strategy="epoch",     # 评估策略
                               save_strategy="epoch",           # 保存策略
                               save_total_limit=3,              # 最大保存数
                               learning_rate=2e-5,              # 学习率
                               weight_decay=0.01,               # weight_decay
                               metric_for_best_model="f1",      # 设定评估指标
                               load_best_model_at_end=True)     # 训练完成后加载最优模型

## 5. 定义训练器

In [10]:
trainer = Trainer(model=model,
                  args=train_args,
                  train_dataset=data_tokenizer["train"],
                  eval_dataset=data_tokenizer["test"],
                  data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
                  compute_metrics=eval_metric)

## 6. 训练

In [11]:
trainer.train()

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.2109,0.186134,0.799,0.578616
2,0.1832,0.16741,0.821,0.630928
3,0.1459,0.1623,0.824,0.640816


TrainOutput(global_step=987, training_loss=0.1817621789200811, metrics={'train_runtime': 769.2546, 'train_samples_per_second': 40.996, 'train_steps_per_second': 1.283, 'total_flos': 4148735120916480.0, 'train_loss': 0.1817621789200811, 'epoch': 3.0})

## 7. 评估

In [12]:
eval_result = trainer.evaluate(data_tokenizer["test"])
eval_result

{'eval_loss': 0.16230031847953796,
 'eval_accuracy': 0.824,
 'eval_f1': 0.6408163265306122,
 'eval_runtime': 7.8649,
 'eval_samples_per_second': 127.148,
 'eval_steps_per_second': 4.069,
 'epoch': 3.0}

## 8.推理

In [39]:
# 由于是自定义模型，这里就要自己写推理方法了,就是利用模型对输入的数据进行编码，然后手动计算相似度
text1="我喜欢北京"
text2="今天天气怎么样"
inputs  = tokenizer([text1, text2], max_length=128, truncation=True, return_tensors="pt", padding=True)
inputs = {k: v.to('cuda:0') for k, v in inputs.items()}
inputs

{'input_ids': tensor([[ 101, 2769, 1599, 3614, 1266,  776,  102,    0,    0],
         [ 101,  791, 1921, 1921, 3698, 2582,  720, 3416,  102]],
        device='cuda:0'),
 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0],
         [0, 0, 0, 0, 0, 0, 0, 0, 0]], device='cuda:0'),
 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 0, 0],
         [1, 1, 1, 1, 1, 1, 1, 1, 1]], device='cuda:0')}

In [44]:
# 利用这个编码模型，编码两个句子后，计算这两个句子是否相似
output = model.bert(**inputs)
logits=output[1] # 2*768
cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
cos

0.2078535109758377

In [51]:
print('相似' if cos>0.7 else '不相似')

不相似