In [None]:
%env CUDA_VISIBLE_DEVICES=1
%env TOKENIZERS_PARALLELISM=false

In [None]:
BASE_PATH = "/home/stepan/kaggle-arc-agi"
MODEL_ID = f"{BASE_PATH}/models/gemma-2-2b-it/checkpoint-500"
MAX_NEW_TOKENS = 2048
MAX_SEQ_LENGTH = 8192 - MAX_NEW_TOKENS

In [None]:
import sys

sys.path.append(BASE_PATH)
sys.path.append(f"{BASE_PATH}/scripts")

In [None]:
import torch  # type: ignore
import numpy as np  # type: ignore

from datasets import DatasetDict, Dataset  # type: ignore

from unsloth import FastLanguageModel  # type: ignore

from tqdm.auto import tqdm  # type: ignore

from logger import get_logger  # type: ignore
import train_utils  # type: ignore
import data_utils  # type: ignore

In [None]:
log = get_logger(f"{BASE_PATH}/logs/gemma-2-2b", "arc-agi")

In [None]:
def get_model_tokenizer(dtype=None, load_in_4bit=True):
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name=MODEL_ID,
        max_seq_length=MAX_SEQ_LENGTH,
        dtype=dtype,
        load_in_4bit=load_in_4bit,
        device_map={"": 0},
        attn_implementation="flash_attention_2",
        # token = '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', # use one if using gated models like meta-llama/Llama-2-7b-hf
    )

    return model, tokenizer

In [None]:
def eval(f):
    def wrapper(model, tokenizer, *args, **kwargs):
        FastLanguageModel.for_inference(model)
        return f(model, tokenizer, *args, **kwargs)

    return wrapper

In [None]:
model, tokenizer = get_model_tokenizer()

In [None]:
dataset = data_utils.prepare_dataset(tokenizer, fit_dataset=True, base_path=BASE_PATH)
dataset

In [None]:
def generate_with_temp(model, inputs, temperature):
    outputs = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS, do_sample=True, temperature=temperature, top_k=50, use_cache=True)
    return outputs


def evaluate_batch(model, tokenizer, batch):
    inputs = {
        "input_ids": batch["input_ids"],
        "attention_mask": batch["attention_mask"],
    }

    with torch.no_grad():
        outputs1 = generate_with_temp(model, inputs, 0.3)
        outputs2 = generate_with_temp(model, inputs, 0.7)

    input_ids_length = inputs["input_ids"].shape[1]  # sequence length without new tokens
    new_tokens1 = outputs1[:, input_ids_length:]
    new_tokens2 = outputs2[:, input_ids_length:]

    generated_texts1 = tokenizer.batch_decode(new_tokens1, skip_special_tokens=True)
    generated_texts2 = tokenizer.batch_decode(new_tokens2, skip_special_tokens=True)

    return generated_texts1, generated_texts2

In [None]:
@eval
def evaluate(model, tokenizer, dataset, batch_size):
    eval_dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=train_utils.collate(mode="test", tokenizer=tokenizer),
    )

    challenge_ids = []
    preds = []
    labels = []
    for i, batch in tqdm(enumerate(eval_dataloader), total=len(eval_dataloader)):
        generated_texts1, generated_texts2 = evaluate_batch(model, tokenizer, batch)

        # Ensure solutions is always a list
        ids = batch["id"]
        challenges = batch["challenge"]
        solutions = batch["solution"]

        # I don't like how complicated this is, but I don't see an easier way to do it right now
        for gen_text1, gen_text2, label, challenge_id, challenge in zip(generated_texts1, generated_texts2, solutions, ids, challenges):
            parsed_output1 = train_utils.parse_output(gen_text1)
            parsed_output2 = train_utils.parse_output(gen_text2)

            if parsed_output1 is None and parsed_output2 is None:
                print(f"Failed to parse both outputs: {gen_text1} and {gen_text2}")
                preds.append(None)
            else:
                # Choose the best prediction based on partial match score
                score1 = train_utils.calculate_partial_match(parsed_output1, train_utils.tensor_to_int(label)) if parsed_output1 is not None else 0
                score2 = train_utils.calculate_partial_match(parsed_output2, train_utils.tensor_to_int(label)) if parsed_output2 is not None else 0
                best_pred = parsed_output1 if score1 >= score2 else parsed_output2
                preds.append(best_pred)

            labels.append(train_utils.tensor_to_int(label))
            challenge_ids.append((challenge_id, challenge["order"]))

    return {
        "ids": challenge_ids,
        "preds": preds,
        "labels": labels,
    }

In [None]:
results = evaluate(model, tokenizer, dataset["test"], batch_size=1)
# Calculate metrics
accuracy, avg_partial_match = train_utils.calculate_metrics(results["preds"], results["labels"])

log.info(f"Exact match accuracy: {accuracy:.4f}")
log.info(f"Average partial match score: {avg_partial_match:.4f}")