*Step.1 導入相關套件*

In [None]:
!pip install datasets
!pip install evaluate

In [None]:
from transformers import AutoTokenizer , AutoModelForSequenceClassification , Trainer , TrainingArguments
from datasets import load_dataset

*Step.2 載入數據*

In [None]:
dataset = load_dataset("json" , data_files="./train_pair_1w.json" , split="train")
dataset

Generating train split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['sentence1', 'sentence2', 'label'],
    num_rows: 10000
})

In [None]:
datasets = dataset.train_test_split(test_size=0.2)
datasets

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'label'],
        num_rows: 2000
    })
})

*Step.3 數據前處理*

In [None]:
import torch

tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")

def process_function(examples):
    sentences = []
    lables = []
    for sen1 , sen2 , label in zip(examples["sentence1"] , examples["sentence2"] , examples["label"]):
      sentences.append(sen1)
      sentences.append(sen2)
      lables.append(1 if int(label)==1 else -1)
    # input_ids , attention_mask , token_type_ids
    tokenized_examples = tokenizer(sentences, max_length=128, truncation=True , padding="max_length")
    tokenized_examples = {k: [v[i : i + 2]for i in range(0 ,len(v),2)] for k , v in tokenized_examples.items()}
    tokenized_examples["labels"] = lables
    return tokenized_examples

tokenized_datasets = datasets.map(process_function, batched=True, remove_columns=datasets["train"].column_names)
tokenized_datasets

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 8000
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

In [None]:
print(tokenized_datasets["train"][0])

{'input_ids': [[101, 671, 702, 782, 1762, 800, 812, 6716, 1400, 4692, 4708, 671, 6775, 5273, 4635, 4685, 7313, 4638, 3867, 7344, 6756, 511, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [101, 3300, 782, 1762, 4692, 2418, 2593, 6756, 511, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], 'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0

*Step.4 建立模型*

In [None]:
from transformers import BertForSequenceClassification , BertPreTrainedModel , BertModel
from typing import Optional
from torch.nn import CosineSimilarity , CosineEmbeddingLoss


class DualModel(BertPreTrainedModel):

    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
      super().__init__(config, *inputs, **kwargs)
      self.bert = BertModel(config)
      self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict


        # step.1 分別取得sentenceA和sentenceB的輸入
        senA_input_ids , senB_input_ids = input_ids[: , 0] , input_ids[: , 1]
        senA_attention_mask , senB_attention_mask = attention_mask[: , 0] , attention_mask[: , 1]
        sanAtoken_type_ids , senBtoken_type_ids = token_type_ids[: , 0] , token_type_ids[: , 1]

        # step.2 分別取得sentenceA和sentenceB的向量
        senA_outputs = self.bert(
        senA_input_ids,
        attention_mask=senA_attention_mask,
        token_type_ids=senA_token_type_ids,
        position_ids=position_ids,
        ead_mask=head_mask,
        inputs_embeds=inputs_embeds,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
        return_dict=return_dict,
        )
        senA_pooled_output = senA_outputs[1]    # [batch, hidden]

        senB_outputs = self.bert(
        senB_input_ids,
        attention_mask=senB_attention_mask,
        token_type_ids=senB_token_type_ids,
        position_ids=position_ids,
        head_mask=head_mask,
        inputs_embeds=inputs_embeds,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
        return_dict=return_dict,
        )
        senB_pooled_output = senB_outputs[1]    # [batch, hidden]

        # step.3 計算相似度
        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)    # [batch, ]

        # step.4 計算loss
        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)

        output = (cos,)
        return ((loss,) + output) if loss is not None else output

model = DualModel.from_pretrained("hfl/chinese-macbert-base")


*Step.5 評估函數*

In [None]:
import evaluate

acc_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

Downloading builder script:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

Downloading builder script:   0%|          | 0.00/6.77k [00:00<?, ?B/s]

In [None]:
def eval_metric(eval_predict):
    predictions, labels = eval_predict
    predictions = [int(p > 0.7) for p in predictions]
    labels = [int(l > 0) for l in labels]
    # predictions = predictions.argmax(axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metirc.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

*Step.6 訓練參數*

In [None]:
train_args = TrainingArguments(output_dir="./dual_model",
                per_device_train_batch_size = 32,
                per_device_eval_batch_size = 32,
                logging_steps =10,
                eval_strategy = "epoch",
                save_strategy = "epoch",
                save_total_limit = 3,
                learning_rate = 2e-5,
                weight_decay = 0.01,
                metric_for_best_model = "f1",
                load_best_model_at_end = True)
train_args

*Step.7 訓練器*

In [None]:
from transformers import DataCollatorWithPadding
trainer = Trainer(model=model,
                  args=train_args,
                  tokenizer = tokenizer,
                  train_dataset = tokenized_datasets["train"],
                  eval_dataset = tokenized_datasets["test"],
                  data_collator = DataCollatorWithPadding(tokenizer=tokenizer),
                  compute_metrics = eval_metric)

  trainer = Trainer(model=model,


*Step.8 模型訓練*

In [None]:
trainer.train()

*Step.9 模型評估*

In [None]:
trainer.evaluate(tokenized_datasets["test"])

*Step.10 模型預測*

In [None]:
class SentenceSimilarityPipeline:

    def __init__(self, model, tokenizer) -> None:
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device

    def preprocess(self, senA, senB):
        return self.tokenizer([senA, senB], max_length=128, truncation=True, return_tensors="pt", padding=True)

    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        return self.model(**inputs)[1]  # [2, 768]

    def postprocess(self, logits):
        cos = CosineSimilarity()(logits[None, 0, :], logits[None,1, :]).squeeze().cpu().item()
        return cos

    def __call__(self, senA, senB, return_vector=False):
        inputs = self.preprocess(senA, senB)
        logits = self.predict(inputs)
        result = self.postprocess(logits)
        if return_vector:
            return result, logits
        else:
            return result



In [None]:
pipe = SentenceSimilarityPipeline(model, tokenizer)

In [None]:
pipe("我喜歡北京", "明天不行", return_vector=True)