In [2]:
#导入相关的包
from transformers import AutoTokenizer, AutoModel, TrainingArguments, Trainer, AutoModelForSequenceClassification
from datasets import load_dataset

In [3]:
#加载数据集
dataset = load_dataset("json",data_files="./train_pair_1w.json",split="train")
dataset

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

In [4]:
#划分数据集
datasets = dataset.train_test_split(test_size=0.2)


In [6]:
#数据集预处理
import torch
tokenizer = AutoTokenizer.from_pretrained("D:\Hugging Face Hub\chinese-macbert-base")
def process_function(examples):
    sentence = []
    labels = []
    for sen1,sen2,label in zip(examples["sentence1"],examples["sentence2"],examples["label"]):
        sentence.append(sen1)
        sentence.append(sen2)
        labels.append( 1 if int(label)==1 else -1)
    tokenizer_examples = tokenizer(sentence, truncation=True, max_length=128,padding ="max_length")
    tokenizer_examples = {k:[v[i:i+2] for i in range(0,len(v),2)] for k,v in tokenizer_examples.items()}
    tokenizer_examples["label"] = labels
    return tokenizer_examples
tokenizer_datasets = datasets.map(process_function, batched=True,remove_columns=datasets["train"].column_names)

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

In [7]:
#创建模型
#model = AutoModelForSequenceClassification.from_pretrained("D:\Hugging Face Hub\chinese-macbert-base",num_labels=1)
from transformers import BertForSequenceClassification,BertPreTrainedModel,BertModel
from typing import Optional
from torch.nn import CosineSimilarity,CosineEmbeddingLoss
from transformers.configuration_utils import PretrainedConfig
class Dualmodel(BertPreTrainedModel):
    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config)
        self.post_init()
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        #获得句子A和句子B的输入
        senA_input_ids,senB_input_ids = input_ids[:,0],input_ids[:,1]
        senA_attention_mask,senB_attention_mask = attention_mask[:,0],attention_mask[:,1]
        senA_token_type_ids,senB_token_type_ids = token_type_ids[:,0],token_type_ids[:,1]

        #获取两个句子的向量表示
        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        pooledA_output = senA_outputs[1]
        senB_outputs = self.bert(
                        senB_input_ids,
                        attention_mask=senB_attention_mask,
                        token_type_ids=senB_token_type_ids,
                        position_ids=position_ids,
                        head_mask=head_mask,
                        inputs_embeds=inputs_embeds,
                        output_attentions=output_attentions,
                        output_hidden_states=output_hidden_states,
                        return_dict=return_dict,
        )
        pooledB_output = senB_outputs[1]
        #计算相似度
        cos = CosineSimilarity()(pooledA_output,pooledB_output)
        #计算loss
        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(pooledA_output,pooledB_output,labels)
        output = (cos,)
        return ((loss,) + output) if loss is not None else output
model = Dualmodel.from_pretrained("D:\Hugging Face Hub\chinese-macbert-base")

In [8]:
#创建评估函数
import evaluate
acc_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
def eval_metric(eval_predict):
    predictions,labels = eval_predict
    predictions =[int(i>0.7) for i in predictions]
    labels =[int(i>0) for i in labels]
    #predictions = predictions.argmax(dim=-1)
    acc = acc_metric.compute(predictions=predictions, labels=labels)
    f1 = f1_metric.compute(predictions=predictions, labels=labels)
    acc.update(f1)
    return acc


In [9]:
#创建TrainingArguments
train_args = TrainingArguments(
    output_dir="./dual_model",
    per_gpu_eval_batch_size=2,
    per_device_train_batch_size=4,
    learning_rate=2e-5,
    weight_decay=0.01,
    save_strategy="epoch",

)

In [10]:
#创建Trainer
from transformers import DataCollatorWithPadding
trainer = Trainer(
    model=model,
    args=train_args,
    train_dataset=tokenizer_datasets["train"],
    eval_dataset=tokenizer_datasets["test"],
    #data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
    compute_metrics=eval_metric,
)


In [11]:
#模型的训练
trainer.train()

Step,Training Loss
500,0.2527
1000,0.2283
1500,0.218
2000,0.2021
2500,0.1547
3000,0.1622
3500,0.1494
4000,0.1553
4500,0.0995
5000,0.1059


TrainOutput(global_step=6000, training_loss=0.16036114438374838, metrics={'train_runtime': 1225.3264, 'train_samples_per_second': 19.587, 'train_steps_per_second': 4.897, 'total_flos': 3157275967488000.0, 'train_loss': 0.16036114438374838, 'epoch': 3.0})

In [12]:
#模型预测
class SentenceSimilarityPipeline:

    def __init__(self, model, tokenizer) -> None:
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device

    def preprocess(self, senA, senB):
        return self.tokenizer([senA, senB], max_length=128, truncation=True, return_tensors="pt", padding=True)

    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        return self.model(**inputs)[1]  # [2, 768]

    def postprocess(self, logits):
        cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
        return cos

    def __call__(self, senA, senB, return_vector=False):
        inputs = self.preprocess(senA, senB)
        logits = self.predict(inputs)
        result = self.postprocess(logits)
        if return_vector:
            return result, logits
        else:
            return result

In [13]:
pipe = SentenceSimilarityPipeline(model,tokenizer)


In [14]:
pipe("我喜欢北京","北京是个好地方，我想下次再来")

0.6053197383880615