In [None]:
!pip install pandas scikit-learn transformers peft bitsandbytes

In [None]:
import pandas as pd
import numpy as np
import os
import random
import torch
from torch.utils.data import Dataset
from sklearn.metrics import log_loss
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForCausalLM,
    BitsAndBytesConfig
)
from peft import PeftModel
from tqdm import tqdm


def seed_everything(seed=None):
    """
    固定seed
    :param seed: int, 随机种子
    """
    max_seed_value = np.iinfo(np.uint32).max
    min_seed_value = np.iinfo(np.uint32).min

    if (seed is None) or not (min_seed_value <= seed <= max_seed_value):
        seed = random.randint(np.iinfo(np.uint32).min, np.iinfo(np.uint32).max)

    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    return seed


seed_everything(42)

In [None]:
class InstructionDataSet(Dataset):
    def __init__(self, data, tokenizer, max_source_length, max_target_length):
        super(InstructionDataSet, self).__init__()
        self.data = data
        self.tokenizer = tokenizer
        self.max_source_length = max_source_length
        self.max_target_length = max_target_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        now_data = self.data.loc[index]
        idx = now_data["id"]
        templete_part1 = "<start_of_turn>user\nHere are two question-answering dialogues. Compare two model performance on answering question, determine which is better.\n\n"
        templete_part1_input_ids = self.tokenizer(text=templete_part1, add_special_tokens=True, padding=False)["input_ids"]

        templete_part2 = "\n###options\nA. Model A\nB. Model B\nC. Tie\n<end_of_turn>\n"
        templete_part2_input_ids = self.tokenizer(text=templete_part2, add_special_tokens=True, padding=False)["input_ids"][1:]
  
        templete_part3 = "<start_of_turn>model\n"
        templete_part3_input_ids = self.tokenizer(text=templete_part3, add_special_tokens=True, padding=False)["input_ids"][1:]
        prompt_response = now_data["prompt_response"]

        prompt_response_ids = self.tokenizer(
            text=prompt_response, 
            add_special_tokens=True, 
            truncation=True,
            max_length=self.max_source_length, 
            padding=False
        )["input_ids"][1:]

        input_ids = templete_part1_input_ids + prompt_response_ids + templete_part2_input_ids + templete_part3_input_ids
        input_text = self.tokenizer.decode(input_ids, skip_special_tokens=False)
        
        return {
            "input_ids": input_text,
            "id": idx
        }

In [None]:
def collate_fn(batch):
    batch = {k: [item[k] for item in batch] for k in ("input_ids", "id")}
    batch_input = tokenizer(
        batch["input_ids"],
        padding="longest",
        truncation=True,
        return_tensors="pt",
        add_special_tokens=True,
        max_length=MAX_LENGTH + 50
    )
    return batch_input, batch["id"]

In [None]:
from utils import load_split_data
data_path = "../data/train.csv"
prompt_type = 2
MAX_INPUT = 2300
if_train = True
df_train, df_valid = load_split_data(data_path, prompt_type, MAX_INPUT, if_train, True)
test = df_valid

In [None]:
def inference(model, test_dataloader):
    test_predictions = []
    for batch in tqdm(test_dataloader):
        batch_input, idx = batch
        for k in batch_input.keys():
            batch_input[k] = batch_input[k].to(device)
        with torch.no_grad():
            response = model.generate(**batch_input, max_new_tokens=1, return_dict_in_generate=True, output_scores=True)
            score = response.scores[0]
            A_prob, B_prob, C_prob = score[:, A_TOKEN_IDS], score[:, B_TOKEN_IDS], score[:, C_TOKEN_IDS]
            logits = torch.Tensor([[A_prob, B_prob, C_prob]])
            logits = torch.softmax(logits, dim=-1).cpu().numpy()
            node_result = [[idx[i], logits[i]] for i in range(batch_size)]
        test_predictions.append(node_result)
    return test_predictions

In [None]:
device = torch.device("cuda:0")

In [None]:
base_model = "google/gemma-2-9b-it"
model_path = "./lmsys/checkpoint-2300"
MAX_LENGTH = 2300

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_path, truncation_side="left")
config = AutoConfig.from_pretrained(base_model, trust_remote_code=True, token="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")

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True
)
base_model_0 = AutoModelForCausalLM.from_pretrained(
    base_model,
    config=config,
    quantization_config=bnb_config,
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True,
    token="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"
)

new_model = model_path
model0 = PeftModel.from_pretrained(base_model_0, new_model).to(device)
model0.eval()

In [None]:
A_TOKEN_IDS = tokenizer("A", add_special_tokens=True, truncation=True, max_length=1024)["input_ids"][1:]
B_TOKEN_IDS = tokenizer("B", add_special_tokens=True, truncation=True, max_length=1024)["input_ids"][1:]
C_TOKEN_IDS = tokenizer("C", add_special_tokens=True, truncation=True, max_length=1024)["input_ids"][1:]

In [None]:
A_TOKEN_IDS, B_TOKEN_IDS, C_TOKEN_IDS

In [None]:
batch_size = 1
tokenized_dataset = InstructionDataSet(test, tokenizer, MAX_LENGTH, 1)

test_dataloader = torch.utils.data.DataLoader(tokenized_dataset, batch_size=batch_size, collate_fn=collate_fn)

In [None]:
sub_pred = inference(model=model0, test_dataloader=test_dataloader)

In [None]:
processed_data = []
for item in sub_pred:
    item = item[0]
    id = item[0].item()  
    array_values = item[1].tolist()  
    processed_data.append([id] + array_values)

In [None]:
new_columns = ["id", "winner_model_a", "winner_model_b", "winner_tie"]
df = pd.DataFrame(processed_data, columns=new_columns)

In [None]:
df = df.groupby("id").mean().reset_index()

In [None]:
df

In [None]:
str2num = {"A": 0, "B": 1, "C": 2}
test["label_number"] = test.label.map(str2num)
prediction = np.array(df[new_columns[1:]])
test = test.drop_duplicates(subset=["id"]).reset_index(drop=True)
log_loss(test.label_number, prediction)