# 文本相似度实例

主要针对文本相似度匹配中的多个候选文本找最相似的问题，基于交互策略的解决方案可以通过直接输出相似度的方式初步解决这一问题，但是在大规模数据的情况下耗时太长，所以要用基于向量匹配的解决方案，既能直接输出相似度又能快速匹配，其实就是Sentence-BERT的Regression Objective Function思想（详见https://wmathor.com/index.php/archives/1496/），当然loss function从MAE换成了CosineEmbeddingLoss，应该是意味着两者各有优劣，其实它们都用的是余弦相似度（两向量夹角的余弦）！

当然这一方式也有缺陷，那就是效果通常并不好，很难直接取到最优结果。其他还可用sentence-transformers库、text2vec模型和uniem框架等计算文本向量化！都是双塔的向量匹配模式

In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "0"

## Step1 导入相关包

In [2]:
import evaluate
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


## Step2 加载数据集

In [3]:
dataset = load_dataset("json", data_files="./train_pair_1w.json", split="train")
dataset

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

In [4]:
dataset[:2]

{'sentence1': ['找一部小时候的动画片',
  '我不可能是一个有鉴赏能力的行家，小姐我把我的时间都花在书写上；象这样豪华的舞会，我还是头一次见到。'],
 'sentence2': ['求一部小时候的动画片。谢了', '蜡烛没熄就好了，夜黑得瘆人，情绪压抑。'],
 'label': ['1', '0']}

## Step3 划分数据集

In [5]:
datasets = dataset.train_test_split(test_size=0.2)
datasets

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 2000
    })
})

## Step4 数据集预处理

In [6]:
import torch

tokenizer = AutoTokenizer.from_pretrained("/data/PLM/chinese-macbert-base")

# 主要处理与多项选择比较类似，也是一对（正负）样本sentence组成一个sample，形状为[batch_size, 2, seq_length]
def process_function(examples):
    sentences = []
    labels = []
    for sen1, sen2, label in zip(examples["sentence1"], examples["sentence2"], examples["label"]):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else -1) # label本来是0和1，现在为使用CosineEmbeddingLoss改成了-1和1
    # input_ids, attention_mask, token_type_ids
    tokenized_examples = tokenizer(sentences, max_length=128, truncation=True, padding="max_length")
    # 这里随机组合倒也合理，正正/负负/正负
    tokenized_examples = {k: [v[i: i + 2] for i in range(0, len(v), 2)] for k, v in tokenized_examples.items()}
    tokenized_examples["labels"] = labels
    return tokenized_examples

tokenized_datasets = datasets.map(process_function, batched=True, remove_columns=datasets["train"].column_names)
tokenized_datasets

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map: 100%|██████████| 8000/8000 [00:01<00:00, 4321.30 examples/s]
Map: 100%|██████████| 2000/2000 [00:00<00:00, 4082.69 examples/s]


DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

In [7]:
print(tokenized_datasets["train"][0])

{'input_ids': [[101, 6443, 3300, 1921, 1921, 6999, 6651, 6774, 1221, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [101, 6443, 3300, 1921, 1921, 6999, 6651, 6774, 1221, 1557, 8024, 6432, 678, 8024, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], 'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

## Step5 创建模型

In [8]:
from transformers import BertForSequenceClassification, BertPreTrainedModel, BertModel
from typing import Optional
from transformers.configuration_utils import PretrainedConfig
from torch.nn import CosineSimilarity, CosineEmbeddingLoss

# 创建自己的类，照着BertForSequenceClassification来写就行
class DualModel(BertPreTrainedModel): # 继承自BertPreTrainedModel，好像Sentence-BERT只能用他们自己设计的包

    # __init__其实只要求config，*inputs和**kwargs好像最后也没用上
    def __init__(self, config: PretrainedConfig, *inputs, **kwargs): 
        super().__init__(config, *inputs, **kwargs)
        # 这个任务不需要分类器，所以self.classifier也没必要加，反正也就是个线性层
        self.bert = BertModel(config)
        self.post_init() # 权重初始化并允许开启梯度检查点

    def forward( # 基本上是从BertForSequenceClassification中copy过来的
        self, 
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None, # 没有提供的话会自动使用内置的绝对位置编码，但这也是可学习的
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ): # Optional需要from typing import Optional才能用
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict # 这句copy过来就行

        # Step1 分别获取sentenceA和sentenceB的输入
        senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
        senA_attention_mask, senB_attention_mask = attention_mask[:, 0], attention_mask[:, 1]
        senA_token_type_ids, senB_token_type_ids = token_type_ids[:, 0], token_type_ids[:, 1]

        # Step2 分别获取sentenceA和sentenceB的向量表示
        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        # 只取[CLS]位置上的输出，outputs[2:]的确是其他tokens对应的输出。注意outputs[0]是loss
        senA_pooled_output = senA_outputs[1]    # [batch_size, hidden_size]

        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        # 只取[CLS]位置上的输出，直接使用BertForSequenceClassification的话outputs[2:]会放在logits里一起输出，所以logits一般只取logits[0]
        senB_pooled_output = senB_outputs[1]    # [batch_size, hidden_size]

        # step3 计算相似度作为logits
        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)    # [batch, ]

        # step4 计算loss
        loss = None
        if labels is not None: # 如果提供了labels那才计算loss，不然就像测试集一样
            # 小于margin的负样本被认为是简单样本，不计算loss，这是为了确保负样本对之间有一定的差异，不一定完全不相关或者负相关，而正样本对就是越相似越好！
            loss_fct = CosineEmbeddingLoss(0.3) 
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)

        output = (cos,)
        return ((loss,) + output) if loss is not None else output # 如果要使用Trainer的话，返回要么是元组，
        # 要么是正规的SequenceClassifierOutput（return_dict存在的情况下）
    
model = DualModel.from_pretrained("/data/PLM/chinese-macbert-base")

## Step6 创建评估函数

In [9]:
acc_metric = evaluate.load("/data/daiyw/Compare/evaluate/metrics/accuracy")
f1_metric = evaluate.load("/data/daiyw/Compare/evaluate/metrics/f1")

In [10]:
def eval_metric(eval_predict):
    predictions, labels = eval_predict
    predictions = [int(p > 0.7) for p in predictions] # 余弦相似度0.5以上就算相似对文本相似度匹配来说有点太低了，可以改一下
    labels = [int(l > 0) for l in labels] # labels之前转为-1/1，现在要转回0/1
    # predictions = predictions.argmax(axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metric.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

## Step7 创建TrainingArguments

In [11]:
train_args = TrainingArguments(output_dir="./dual_model",      # 输出文件夹
                               per_device_train_batch_size=32,  # 训练时的batch_size
                               per_device_eval_batch_size=32,  # 验证时的batch_size
                               logging_steps=10,                # log 打印的频率
                               evaluation_strategy="epoch",     # 评估策略
                               save_strategy="epoch",           # 保存策略
                               save_total_limit=3,              # 最大保存数
                               learning_rate=2e-5,              # 学习率
                               weight_decay=0.01,               # weight_decay
                               metric_for_best_model="f1",      # 设定评估指标，如果为空的话就默认"loss"
                               load_best_model_at_end=True)     # 训练完成后加载最优模型

## Step8 创建Trainer

In [12]:
trainer = Trainer(model=model, 
                  args=train_args, 
                  train_dataset=tokenized_datasets["train"], 
                  eval_dataset=tokenized_datasets["test"], 
                  # data_collator=DataCollatorWithPadding(tokenizer=tokenizer), # 之前已经padding过，这里留着也没什么用
                  compute_metrics=eval_metric)

Detected kernel version 4.15.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


## Step9 模型训练

In [13]:
trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.2109,0.193677,0.766,0.721097
2,0.164,0.180661,0.791,0.743558
3,0.1282,0.180686,0.7815,0.727386


TrainOutput(global_step=750, training_loss=0.1832841467857361, metrics={'train_runtime': 367.7751, 'train_samples_per_second': 65.257, 'train_steps_per_second': 2.039, 'total_flos': 3157275967488000.0, 'train_loss': 0.1832841467857361, 'epoch': 3.0})

## Step10 模型评估

In [14]:
trainer.evaluate(tokenized_datasets["test"]) # 效果可能不是很好，这就是速度快的代价

{'eval_loss': 0.180661141872406,
 'eval_accuracy': 0.791,
 'eval_f1': 0.7435582822085889,
 'eval_runtime': 9.6354,
 'eval_samples_per_second': 207.569,
 'eval_steps_per_second': 6.538,
 'epoch': 3.0}

## Step11 模型预测

In [15]:
class SentenceSimilarityPipeline:
    
    # 需要输入model和tokenizer！如果是生成式那直接用model.generate也可以吧，前提是要自己先编码
    def __init__(self, model, tokenizer) -> None: 
        self.model = model.bert # 模型中的self.bert，虽然是继承来的
        self.tokenizer = tokenizer
        self.device = model.device

    def preprocess(self, senA, senB):
        # tokenizer其实已经可以对字符串列表进行编码了，预处理的时候也是这么做的！
        return self.tokenizer([senA, senB], max_length=128, truncation=True, return_tensors="pt", padding=True)

    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()} # 通过这种方式构建inputs字典
        return self.model(**inputs)[1]  # [2, 768]，提取[CLS]的部分即可。出现两个[CLS]是因为preprocess过程中一次编码了两句！

    def postprocess(self, logits):
        cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
        return cos

    def __call__(self, senA, senB, return_vector=False):
        inputs = self.preprocess(senA, senB)
        logits = self.predict(inputs) # logits就是senA和senB的向量表示！
        result = self.postprocess(logits) # result就是两者的相似度
        if return_vector:
            return result, logits
        else:
            return result

In [16]:
pipe = SentenceSimilarityPipeline(model, tokenizer)

In [17]:
pipe("我喜欢北京", "明天不行", return_vector=True) # 注意，训练过程中的评估函数是大于0.7才判断为相似！这里也要一视同仁！

(0.3124469518661499,
 tensor([[-0.9416, -0.9573,  0.7507,  ...,  0.8803,  0.2465, -0.5562],
         [-0.9998, -0.9978, -0.9793,  ...,  0.9967,  0.9938,  0.2739]],
        device='cuda:0', grad_fn=<TanhBackward0>))