In [None]:
import os
os.environ["TORCH_ROCM_AOTRITON_ENABLE_EXPERIMENTAL"] = "1"

In [None]:
from transformers import AutoTokenizer, TrainingArguments, Trainer
from datasets import load_dataset
import evaluate

In [None]:
datasets = load_dataset("json", data_files="./train_pair_1w.json", split="train").train_test_split(test_size=0.2)

In [None]:
datasets["train"]

In [None]:
tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")

In [None]:
import numpy as np
def process_function(examples, tokenizer=tokenizer):
    sentences = [
        x for sentence1, sentence2 in zip(examples["sentence1"], examples["sentence2"])
        for x in (sentence1, sentence2)
    ]
    labels = [1 if int(x) == 1 else -1 for x in examples["label"]]
    inputs = tokenizer(sentences, truncation=True, max_length=256, padding="max_length")
    inputs = {k: np.array(v).reshape(-1, 2, 256) for k, v in inputs.items()}
    inputs["labels"] = labels
    return inputs

In [None]:
tokenized_datasets = datasets.map(process_function, batched=True, remove_columns=datasets["train"].column_names)
tokenized_datasets

In [None]:
tokenized_datasets["train"]

In [None]:
datasets["train"]

In [None]:
tokenizer.decode(tokenized_datasets["train"][0]["input_ids"][1])

In [None]:
from transformers import BertModel, BertPreTrainedModel, PretrainedConfig
from transformers.utils.generic import TransformersKwargs
from transformers.modeling_outputs import SequenceClassifierOutput
from typing import Unpack
import torch
from torch.nn import CosineSimilarity, CosineEmbeddingLoss

class DualModel(BertPreTrainedModel):
    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config)
        self.post_init()

    def forward(
            self,
            # (B,2,S)
            input_ids: torch.Tensor | None = None,
            attention_mask: torch.Tensor | None = None,
            token_type_ids: torch.Tensor | None = None,
            position_ids: torch.Tensor | None = None,
            inputs_embeds: torch.Tensor | None = None,
            labels: torch.Tensor | None = None,
            **kwargs: Unpack[TransformersKwargs],
    ) -> tuple[torch.Tensor] | SequenceClassifierOutput:
        senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
        senA_attention_mask, senB_attention_mask = attention_mask[:, 0], attention_mask[:, 1]
        senA_token_type_ids, senB_token_type_ids = token_type_ids[:, 0], token_type_ids[:, 1]

        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            return_dict=True,
            **kwargs,
        )

        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            return_dict=True,
            **kwargs,
        )

        # (B,H) / (1,768)
        senA_pooled_output = senA_outputs.pooler_output
        senB_pooled_output = senB_outputs.pooler_output

        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)

        loss = None
        if labels is not None:
            loss = CosineEmbeddingLoss(0.3)(senA_pooled_output, senB_pooled_output, labels)

        return SequenceClassifierOutput(
            loss=loss,
            logits=cos
        )


In [None]:
# model = DualModel.from_pretrained("hfl/chinese-macbert-base", num_labels=1).to("cuda")
model = DualModel.from_pretrained("./dual_model/checkpoint-750", num_labels=1).to("cuda")

In [None]:
accuracy = evaluate.load("accuracy")
f1 = evaluate.load("f1")

In [None]:
def compute_metric(pred):
    predictions, labels = pred
    predictions = [int((p > 0.7).item()) for p in predictions]
    labels = [int(l > 0) for l in labels]
    accuracy_metric = accuracy.compute(predictions=predictions, references=labels)
    accuracy_metric.update(f1.compute(predictions=predictions, references=labels))
    return accuracy_metric

In [None]:
from transformers import TrainerCallback, TrainerState, TrainerControl

class MyTrainerCallback(TrainerCallback):
    def on_step_end(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs):
        if state.global_step % args.logging_steps == 0 and len(state.log_history) > 0:
            print(state.log_history[len(state.log_history) - 1])

In [None]:
args = TrainingArguments(
    output_dir="./dual_model2",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    num_train_epochs=3,
    logging_steps=10,
    eval_strategy="steps",
    save_strategy="steps",
    save_steps=100,
    eval_steps=100,
    learning_rate=2e-5,
    weight_decay=0.01,
    metric_for_best_model="f1",
    load_best_model_at_end=True,
    log_level="info",
    log_level_replica="info",
    logging_first_step=True
)

In [None]:
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    compute_metrics=compute_metric,
    # callbacks=[MyTrainerCallback()]
)

In [None]:
trainer.train()

In [None]:
trainer.evaluate(tokenized_datasets["test"].select(range(3)))

In [None]:
trainer.evaluate(tokenized_datasets["test"].select(range(1)))

In [None]:
class SentenceSimilarityPipeline:
    def __init__(self, model, tokenizer) -> None:
        self.model = model.bert
        self.tokenizer = tokenizer
        self.device = model.device

    def preprocess(self, sentenceA, sentenceB):
        # (B,S)/(2,S)
        inputs = self.tokenizer([sentenceA, sentenceB], max_length=256, truncation=True, return_tensors="pt", padding=True)
        inputs = inputs.to(self.device)
        return inputs

    def predict(self, inputs):
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        outputs = self.model(**inputs)
        # (B,H)/(2,H)
        return outputs.pooler_output

    def postprocess(self, logits):
        #(H)
        # print(logits[0].shape)
        # (1,H)
        # print(logits[0].unsqueeze(0).shape)
        # (1) 就是代表cosine
        print(CosineSimilarity()(logits[0].unsqueeze(0), logits[1].unsqueeze(0)))
        # 其实这里可以不用squeeze
        cos = CosineSimilarity()(logits[0].unsqueeze(0), logits[1].unsqueeze(0)).squeeze().cpu().item()
        return cos

    def __call__(self, sentenceA: str, sentenceB: str, return_vector=False):
        inputs = self.preprocess(sentenceA, sentenceB)
        logits = self.predict(inputs)
        cos = self.postprocess(logits)
        if return_vector:
            return cos, logits
        else:
            return cos


In [None]:
pipe = SentenceSimilarityPipeline(model, tokenizer)

In [None]:
res = pipe("黑笔", "黑色签字笔", return_vector=True)
res