# 基于向量匹配的文本相似度
06-文本相似度中采用的是基于交互策略的文本相似度匹配,如果候选文本数量非常大，一个一个交互会消耗大量资源。  
所以这里引入向量匹配

## Step1 导入相关包

In [1]:
from transformers import AutoTokenizer, Trainer, TrainingArguments, AutoModelForSequenceClassification, DataCollatorWithPadding
from datasets import load_dataset
import evaluate

# step2 加载数据集

In [2]:
ds = load_dataset('json', data_files='../dataset/SimCLUE-dev.json', split='train[0:10000]')
ds

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

# step3 划分数据集

In [3]:
ds = ds.train_test_split(test_size=0.2)
ds

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 2000
    })
})

In [4]:
ds['train'][0]

{'sentence1': '男朋友生日送什么有创意的礼物比较好？',
 'sentence2': '男朋友过生送什么礼物好呢？要有创意的',
 'label': '1'}

# step4 数据预处理

In [5]:
tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")
tokenizer

BertTokenizerFast(name_or_path='hfl/chinese-macbert-base', vocab_size=21128, model_max_length=1000000000000000019884624838656, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=False, added_tokens_decoder={
	0: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	100: AddedToken("[UNK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	101: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	102: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	103: AddedToken("[MASK]", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
}
)

In [6]:
def propress_function(examples):
    sentences = []
    labels = []
    for sen1, sen2, label in zip(examples['sentence1'], examples['sentence2'], examples['label']):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else -1)  # 1为正例，-1为反例
    
    #input_ids, attention_mask, token_type_ids
    tokenized_examples = tokenizer(sentences, truncation=True, padding='max_length', max_length=128)
    tokenized_examples = { k : [v[i:i+2]for i in range(0, len(v), 2)] for k, v in tokenized_examples.items()}  # 将数据集两个两个组队
    tokenized_examples['labels'] = labels
    return tokenized_examples

In [7]:
tokenized_ds = ds.map(propress_function, remove_columns=ds['train'].column_names, batched=True)
tokenized_ds['train'][0]

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

{'input_ids': [[101,
   4511,
   3301,
   1351,
   4495,
   3189,
   6843,
   784,
   720,
   3300,
   1158,
   2692,
   4638,
   4851,
   4289,
   3683,
   6772,
   1962,
   8043,
   102,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0,
   0],
  [101,
   4511,
   3301,
   1351,
   6814,
   4495,
   6843,
   784,
   720,
   4851,
   4289,
   1962,
   1450,
   8043,
   6206,
   3300,
   1158,
   2692,
   

In [8]:
tokenized_ds

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

# step5 创建模型

In [9]:
# model = AutoModelForSequenceClassification.from_pretrained('hfl/chinese-macbert-base', num_labels=1)
from transformers import  BertPreTrainedModel, BertModel
from transformers.configuration_utils import PretrainedConfig
from typing import Optional
import torch
from torch.nn import CosineSimilarity, CosineEmbeddingLoss

class DualModel(BertPreTrainedModel):
    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config)
        self.post_init()
    
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        # Step1 分别获取sentenceA和sentenceB的输入
        senA_input_ids,  senB_input_ids = input_ids.chunk(2, dim=1)  # [batch, 1, seq_len]
        senA_attention_mask, senB_attention_mask = attention_mask.chunk(2, dim=1)
        senA_token_type_ids, senB_token_type_ids = token_type_ids.chunk(2, dim=1)
        # Step2 获得两个句子的向量表示
        senA_outputs = self.bert(
            senA_input_ids.squeeze(1), #chunk会保留为1的维度
            attention_mask=senA_attention_mask.squeeze(1),
            token_type_ids=senA_token_type_ids.squeeze(1),
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        ) 
        senA_pooled_output = senA_outputs[1]  #[batch, hidden]

        SenB_outputs = self.bert(
            senB_input_ids.squeeze(1),
            attention_mask=senB_attention_mask.squeeze(1),
            token_type_ids=senB_token_type_ids.squeeze(1),
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        senB_pooled_output = SenB_outputs[1]  #[batch, hidden]

        # Step3 计算相似度
        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)

        # Step4 计算loss
        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)

        output = (cos,)
        return ((loss,) + output) if loss is not None else output

In [10]:
model = DualModel.from_pretrained('hfl/chinese-macbert-base')

# step6 创建评估函数

In [11]:
acc_metirc = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

In [12]:
def eval_metric(pred):
    predictions, labels = pred
    predictions = [int(p > 0.7) for p in predictions]
    labels = [int(l > 0)for l in labels]
    acc = acc_metirc.compute(predictions=predictions, references=labels)
    f1  = f1_metric.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

# step7 创建训练参数

In [13]:
args = TrainingArguments(output_dir="./dual_model",      # 输出文件夹
                        per_device_train_batch_size=32,  # 训练时的batch_size
                        per_device_eval_batch_size=32,   # 验证时的batch_size
                        logging_steps=10,                # log 打印的频率
                        eval_strategy="epoch",           # 评估策略
                        save_strategy="epoch",           # 保存策略
                        save_total_limit=3,              # 最大保存数
                        learning_rate=2e-5,              # 学习率
                        weight_decay=0.01,               # weight_decay
                        metric_for_best_model="f1",      # 设定评估指标
                        load_best_model_at_end=True)     # 训练完成后加载最优模型

# step8 创建训练器

In [14]:
trainer = Trainer(
    model=model,
    args = args,
    train_dataset=tokenized_ds['train'],
    eval_dataset=tokenized_ds['test'],
    # data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
    compute_metrics=eval_metric
)

In [15]:
trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.2927,0.307023,0.589,0.635315
2,0.2737,0.270153,0.654,0.656064
3,0.1994,0.26565,0.663,0.654713


TrainOutput(global_step=750, training_loss=0.2658083823521932, metrics={'train_runtime': 523.8149, 'train_samples_per_second': 45.818, 'train_steps_per_second': 1.432, 'total_flos': 3157275967488000.0, 'train_loss': 0.2658083823521932, 'epoch': 3.0})

# step9 模型评估

In [16]:
trainer.evaluate(tokenized_ds['test'])

{'eval_loss': 0.27015286684036255,
 'eval_accuracy': 0.654,
 'eval_f1': 0.6560636182902585,
 'eval_runtime': 13.315,
 'eval_samples_per_second': 150.206,
 'eval_steps_per_second': 4.732,
 'epoch': 3.0}

# step10 模型预测

In [17]:
class SentenceSimilarityPipeline:
    def __init__(self, model, tokenizer):
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device
    
    def preprocess(self, senA, senB):
        return self.tokenizer([senA, senB], max_length=128, truncation=True, return_tensors="pt", padding=True)
    
    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        return self.model(**inputs)[1]

    def postprocess(self, logits):
        cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
        return cos

    def __call__(self, senA, senB, return_vector=False):
        inputs = self.preprocess(senA, senB)
        logits = self.predict(inputs)
        result = self.postprocess(logits)
        if return_vector:
            return result, logits
        else:
            return result

In [18]:
pipe = SentenceSimilarityPipeline(model, tokenizer)

In [21]:
pipe("我喜欢苹果", "我爱吃香蕉")

0.5281082391738892