In [None]:
import torch
import re
from tqdm import tqdm
from peft import PeftModel
from datasets import load_dataset, Dataset, DatasetDict
from transformers import AutoModelForCausalLM, AutoTokenizer

In [None]:
# Константы
DATASET_NAME = 'nlile/hendrycks-MATH-benchmark'
MODEL_NAME = 'Qwen/Qwen2.5-Math-1.5B-Instruct'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 16
MAX_NEW_TOKENS = 1024
LORA = False
LORA_PATH = 'lora_adapter'

In [None]:
# Загружаем датасет
data = load_dataset(DATASET_NAME)
data = data.remove_columns(['subject', 'level', 'unique_id'])['test']
print(data)

In [None]:
# Загружаем модель и токенизатор
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, padding_side='left')
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype='auto', device_map='auto')
if LORA:
    model = PeftModel.from_pretrained(model, LORA_PATH)
model.eval()

In [None]:
# Определяем системный промпт
SYSTEM_PROMPT = 'You are an AI assistant skilled in mathematical reasoning. Please solve the problem using concise step-by-step reasoning process.'
SYSTEM_PROMPT += ' Put your final answer within \\boxed{}.'

# Конструктор промпта
def build_prompt(problem):
    return f'<|system|>\n{SYSTEM_PROMPT}\n<|user|>\n{problem}\n<|assistant|>\n'

# Подготовка батча
def preprocess_batch(batch, max_length=1024):
    prompts = [build_prompt(p) for p in batch['problem']]
    inputs = tokenizer(prompts, return_tensors="pt", padding=True, truncation=True, max_length=max_length)
    return inputs

In [None]:
# Извлечение ответа
def extract_answer(answer):
    box_pattern = r'\\boxed\{([^{}]*(?:\{[^{}]*\}[^{}]*)*)\}'
    box_match = re.search(box_pattern, answer)
    if box_match:
        return box_match.group(1).strip()
    else:
        return ''

# Вычисление метрики pass@k
def pass_at_k(preds, gold, k):
    correct = 0
    for pred, g in zip(preds, gold):
        correct += any(extract_answer(p) == extract_answer(g) for p in pred[:k])
    return correct / len(gold)

# Вычисление метрики self-consistency
def self_consistency(preds):
    result = []
    for pred in preds:
        answers = [extract_answer(p) for p in pred]
        if answers == [''] * len(answers):
            result.append(0)
            continue
        most_common = {}
        for answer in answers:
            most_common[answer] = most_common.get(answer, 0) + 1
        most_common = max(most_common, key=most_common.get)
        result.append(answers.count(most_common) / len(answers))
    return sum(result) / len(result)

In [None]:
# Цикл оценки модели
def evaluate(model, tokenizer, test_dataset, batch_size, k=5):
    all_preds = []
    all_golds = []
    for i in tqdm(range(0, len(test_dataset), batch_size), desc='Processing dataset'):
        batch = test_dataset[i:i + batch_size]
        inputs = preprocess_batch(batch)
        input_ids = inputs['input_ids'].to(DEVICE)
        attention_mask = inputs['attention_mask'].to(DEVICE)
        batch_preds = []
        for _ in range(k):
            with torch.no_grad():
                outputs = model.generate(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    max_new_tokens=MAX_NEW_TOKENS,
                    do_sample=True,
                    top_p=0.95,
                    temperature=0.7,
                    pad_token_id=tokenizer.pad_token_id,
                    eos_token_id=tokenizer.eos_token_id
                )
            decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True)
            batch_preds.append([d.split("<|assistant|>")[-1].strip() for d in decoded])
            
        batch_preds = list(map(list, zip(*batch_preds)))
        all_preds.extend(batch_preds)
        all_golds.extend([s for s in batch['answer']])

    # Сохраняем и возвращаем результат
    result = {}
    result['pass@1'] = pass_at_k(all_preds, all_golds, 1)
    result['pass@5'] = pass_at_k(all_preds, all_golds, 5)
    result['self_consistency'] = self_consistency(all_preds)

    return result

In [None]:
# Вывод результатов
metrics = evaluate(model, tokenizer, data, BATCH_SIZE)
print(f'Pass@1: {metrics['pass@1']}')
print(f'Pass@5: {metrics['pass@5']}')
print(f'Self consistency: {metrics['self_consistency']}')