# 文本相似度实例（单模型单塔模型）模型分别编码两句话再进行分类-交互式

## Step1 导入相关包

In [1]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset
import torch

## Step2 加载数据集

In [2]:

dataset = load_dataset('json', data_files='./data/train_pair_1w.json', split="train") # 如果是加载固定的json文件则用load_dataset
# dataset = DatasetDict.load_from_disk('./data') # 加载的是huggingface的数据集
dataset

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

## Step3 划分数据集

In [3]:
datasets = dataset.train_test_split(test_size=0.2)
datasets

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 2000
    })
})

In [4]:
datasets['train'][0]

{'sentence1': '我们也是沿着不变的轴心旋转，像地球一样，远看是星球，近看是泥土，而且有白日和黑夜交替着出现吗？',
 'sentence2': '人也跟地球一样有南极和北极吗？',
 'label': '1'}

## Step4 数据集预处理

In [5]:
import torch

tokenizer = AutoTokenizer.from_pretrained("D:/pretrained_model/models--hfl--chinese-macbert-base")

def process_function(examples):
    sentences = []
    labels = []
    for sen1, sen2, label in zip(examples["sentence1"], examples["sentence2"], examples["label"]):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else 0)
    
    tokenizer_examples = tokenizer(sentences, max_length=250, truncation=True, padding="max_length")
    tokenizer_examples = {k: [v[i : i + 2] for i in range(0, len(v), 2)] for k, v in tokenizer_examples.items()}
    tokenizer_examples['labels'] = labels
    return tokenizer_examples

tokenized_datasets = datasets.map(process_function, batched=True, remove_columns=datasets['train'].column_names)
tokenized_datasets

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

## Step5 创建模型

In [6]:

from transformers import BertForSequenceClassification, BertPreTrainedModel, BertModel
from typing import Optional
from transformers.configuration_utils import PretrainedConfig
from torch.nn import CosineSimilarity, CosineEmbeddingLoss

class DualModel(BertPreTrainedModel):

    def __init__(self, config, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config)
        self.post_init()
    
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
        senA_attention_mask, senB_attention_mask = attention_mask[:, 0], attention_mask[:, 1]
        senA_token_type_ids, senB_token_type_ids = token_type_ids[:, 0], token_type_ids[:, 1]

        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )

        senA_pooled_output = senA_outputs[1]

        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )

        senB_pooled_output = senB_outputs[1]

        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)

        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)
        output = (cos,)
        return ((loss,) + output) if loss is not None else output

model = DualModel.from_pretrained('D:/pretrained_model/models--hfl--chinese-macbert-base')


  return self.fget.__get__(instance, owner)()


## Step6 创建评估函数

In [7]:
import evaluate

acc_metric = evaluate.load("./metric_accuracy.py")
f1_metirc = evaluate.load("./metric_f1.py")

In [8]:
def eval_metric(eval_predict):
    predictions, labels = eval_predict
    predictions = [int(p > 0.5) for p in predictions]
    labels = [int(l) for l in labels]
    # predictions = predictions.argmax(axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metirc.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

## Step7 创建TrainingArguments

In [9]:
train_args = TrainingArguments(output_dir="./dual_model",      # 输出文件夹
                               per_device_train_batch_size=4,  # 训练时的batch_size
                               per_device_eval_batch_size=32,   # 验证时的batch_size
                               logging_steps=10,                # log 打印的频率
                               evaluation_strategy="epoch",           # 评估策略
                               save_strategy="epoch",           # 保存策略
                               save_total_limit=3,              # 最大保存数
                               learning_rate=2e-6,              # 学习率
                               weight_decay=0.005,               # weight_decay
                               metric_for_best_model="f1",      # 设定评估指标
                               load_best_model_at_end=True,
                               num_train_epochs=1
                            #    max_steps=1000
                               )     # 训练完成后加载最优模型

## Step8 创建Trainer

In [10]:
trainer = Trainer(model=model, 
                  args=train_args, 
                  tokenizer=tokenizer,
                  train_dataset=tokenized_datasets["train"], 
                  eval_dataset=tokenized_datasets["test"], 
                  compute_metrics=eval_metric)

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


## Step9 模型训练

In [11]:
trainer.train()

  0%|          | 0/2000 [00:00<?, ?it/s]

{'loss': 0.0172, 'grad_norm': 0.7525479197502136, 'learning_rate': 1.99e-06, 'epoch': 0.01}
{'loss': 0.017, 'grad_norm': 0.34213656187057495, 'learning_rate': 1.98e-06, 'epoch': 0.01}
{'loss': 0.011, 'grad_norm': 0.5554874539375305, 'learning_rate': 1.9699999999999998e-06, 'epoch': 0.01}
{'loss': 0.0095, 'grad_norm': 0.7245813608169556, 'learning_rate': 1.96e-06, 'epoch': 0.02}
{'loss': 0.0119, 'grad_norm': 0.2016068696975708, 'learning_rate': 1.95e-06, 'epoch': 0.03}
{'loss': 0.0107, 'grad_norm': 1.620959997177124, 'learning_rate': 1.94e-06, 'epoch': 0.03}
{'loss': 0.0148, 'grad_norm': 0.5462867617607117, 'learning_rate': 1.9299999999999997e-06, 'epoch': 0.04}
{'loss': 0.005, 'grad_norm': 0.32554593682289124, 'learning_rate': 1.92e-06, 'epoch': 0.04}
{'loss': 0.005, 'grad_norm': 0.3355433940887451, 'learning_rate': 1.91e-06, 'epoch': 0.04}
{'loss': 0.0073, 'grad_norm': 0.2069815844297409, 'learning_rate': 1.8999999999999998e-06, 'epoch': 0.05}
{'loss': 0.0071, 'grad_norm': 0.408836662

  0%|          | 0/63 [00:00<?, ?it/s]

{'eval_loss': 9.252792369807139e-05, 'eval_accuracy': 0.397, 'eval_f1': 0.5683607730851825, 'eval_runtime': 37.8537, 'eval_samples_per_second': 52.835, 'eval_steps_per_second': 1.664, 'epoch': 1.0}
{'train_runtime': 493.0964, 'train_samples_per_second': 16.224, 'train_steps_per_second': 4.056, 'train_loss': 0.0018450733870267867, 'epoch': 1.0}


TrainOutput(global_step=2000, training_loss=0.0018450733870267867, metrics={'train_runtime': 493.0964, 'train_samples_per_second': 16.224, 'train_steps_per_second': 4.056, 'train_loss': 0.0018450733870267867, 'epoch': 1.0})

## Step10 模型评估

## Step11 模型预测

In [None]:
class SentenceSimilarityPipeline:

    def __init__(self, model, tokenizer) -> None:
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device

    def preprocess(self, senA, senB):
        return self.tokenizer([senA, senB], max_length=128, truncation=True, return_tensors="pt", padding=True)

    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        return self.model(**inputs)[1]  # [2, 768]

    def postprocess(self, logits):
        cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
        return cos

    def __call__(self, senA, senB, return_vector=False):
        inputs = self.preprocess(senA, senB)
        logits = self.predict(inputs)
        result = self.postprocess(logits)
        if return_vector:
            return result, logits
        else:
            return result

In [None]:
pipe = SentenceSimilarityPipeline(model, tokenizer)

In [None]:
pipe("我喜欢北京", "明天不行", return_vector=True)