## 双塔模型的文本匹配

In [1]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
dataset = load_dataset("json", data_files="./train_pair_1w.json", split="train")
dataset

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

In [3]:
datasets = dataset.train_test_split(test_size=0.2)
datasets

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 2000
    })
})

In [4]:
dataset[0]

{'sentence1': '找一部小时候的动画片', 'sentence2': '求一部小时候的动画片。谢了', 'label': '1'}

In [9]:
import torch

tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")

def process_func(examples):
    sentences = []
    labels = []
    for sen1 , sen2 , label in zip(examples["sentence1"], examples["sentence2"], examples["label"]):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else -1)
    
    tokenizer_examples = tokenizer(sentences,truncation = True, max_length = 128,padding = "max_length")
    tokenizer_examples = {k: [v[i:i+2] for i in range(0,len(v),2)] for k,v in tokenizer_examples.items()}
    tokenizer_examples["labels"] = labels
    return tokenizer_examples

tokenized_dataset = datasets.map(process_func,batched= True,remove_columns= datasets["train"].column_names)
tokenized_dataset

Map: 100%|██████████| 8000/8000 [00:01<00:00, 6802.64 examples/s]
Map: 100%|██████████| 2000/2000 [00:00<00:00, 7555.94 examples/s]


DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

In [10]:
from transformers import BertForSequenceClassification,BertPreTrainedModel,BertModel
from typing import Optional
from torch.nn import CosineSimilarity,CosineEmbeddingLoss

class DualModel(BertPreTrainedModel):
    
    def __init__(self, config, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config=config)
        self.post_init()
        
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        
        #拿到输入
        senA_input_ids, senB_input_ids = input_ids[:,0],input_ids[:,1]
        senA_attention_mask, senB_attention_mask = attention_mask[:,0],attention_mask[:,1]
        senA_token_type_ids, senB_token_type_ids = token_type_ids[:,0],token_type_ids[:,1]
        
        #获取向量表示
        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senA_pooled_output = senA_outputs[1]
        
        
        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senB_pooled_output = senB_outputs[1] #[batch,hidden_layer_size]
        
        
        # 训练评估，计算相似度
        
        cos = CosineSimilarity()(senA_pooled_output,senB_pooled_output) #[batch，1]
        
        #计算loss
        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output,senB_pooled_output,labels)
            
        output = (cos,) 
        return ((loss,) + output) if loss is not None else output
    
    
   
        

In [11]:
model = DualModel.from_pretrained("hfl/chinese-macbert-base")

In [12]:
import evaluate

acc_metric = evaluate.load("accuracy")
f1_metirc = evaluate.load("f1")

In [21]:
def eval_metric(pred):
    predictions , labels = pred
    predictions = [int(p > 0.7) for p in predictions]
    # predictions = predictions.argmax(axis = -1)
    labels = [int(l > 0) for l in labels]
    acc = acc_metric.compute(predictions= predictions , references= labels)
    f1 = f1_metirc.compute(predictions= predictions , references= labels,average='binary')
    acc.update(f1)
    return acc


In [22]:
train_args = TrainingArguments(output_dir="./dual_model",      # 输出文件夹
                               per_device_train_batch_size=32,  # 训练时的batch_size
                               per_device_eval_batch_size=32,   # 验证时的batch_size
                               logging_steps=10,                # log 打印的频率
                               eval_strategy="epoch",           # 评估策略
                               save_strategy="epoch",           # 保存策略
                               save_total_limit=3,              # 最大保存数
                               learning_rate=2e-5,              # 学习率
                               weight_decay=0.01,               # weight_decay
                               metric_for_best_model="f1",      # 设定评估指标
                               load_best_model_at_end=True)     # 训练完成后加载最优模型
train_args

TrainingArguments(
_n_gpu=0,
accelerator_config={'split_batches': False, 'dispatch_batches': None, 'even_batches': True, 'use_seedable_sampler': True, 'non_blocking': False, 'gradient_accumulation_kwargs': None, 'use_configured_state': False},
adafactor=False,
adam_beta1=0.9,
adam_beta2=0.999,
adam_epsilon=1e-08,
auto_find_batch_size=False,
average_tokens_across_devices=False,
batch_eval_metrics=False,
bf16=False,
bf16_full_eval=False,
data_seed=None,
dataloader_drop_last=False,
dataloader_num_workers=0,
dataloader_persistent_workers=False,
dataloader_pin_memory=True,
dataloader_prefetch_factor=None,
ddp_backend=None,
ddp_broadcast_buffers=None,
ddp_bucket_cap_mb=None,
ddp_find_unused_parameters=None,
ddp_timeout=1800,
debug=[],
deepspeed=None,
disable_tqdm=False,
do_eval=True,
do_predict=False,
do_train=False,
eval_accumulation_steps=None,
eval_delay=0,
eval_do_concat_batches=True,
eval_on_start=False,
eval_steps=None,
eval_strategy=epoch,
eval_use_gather_object=False,
fp16=False,
fp1

In [23]:

trainer = Trainer(model=model, 
                  args=train_args, 
                  tokenizer=tokenizer,
                  train_dataset=tokenized_dataset["train"], 
                  eval_dataset=tokenized_dataset["test"], 
                  compute_metrics=eval_metric)

  trainer = Trainer(model=model,


In [29]:
trainer.train()



Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.1521,0.177105,0.805,0.766746
2,0.135,0.174674,0.8065,0.756451




KeyboardInterrupt: 

In [30]:
trainer.evaluate()

KeyboardInterrupt: 

## 预测

In [31]:
class sentenceSimilarityPipeline:
    def __init__(self,tokenizer,model):
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device
    
    def pre_process(self,senA,senB):
        return self.tokenizer([senA,senB],truncation = True, max_length = 128, return_tensors = "pt",padding = True)
        
    def predict(self, inputs):
        return self.model(**inputs)[1] # [2,768]
        
    def post_process(self,logits):
        cos = CosineSimilarity()(logits[None,0,:],logits[None,1,:]).squeeze().cpu().item()
        return cos
    
    def __call__(self, senA , senB):
        inputs = self.pre_process(senA,senB)
        logits = self.predict(inputs)
        result = self.post_process(logits)
        return result

In [32]:
pipe = sentenceSimilarityPipeline(tokenizer=tokenizer,model=model)


In [33]:
pipe("我喜欢北京","北京是我喜欢的地方")

0.7146509885787964