In [1]:
import pandas as pd
import json
import os
import ast
import re
import numpy as np
from datasets import Dataset, concatenate_datasets

# For LLM
from peft import LoraConfig, PeftModel
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    set_seed,
    pipeline
)
from trl import setup_chat_format

import torch
from time import time

# Set seed
set_seed(42)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def split_dictionary(data):
    """
    Splits the tasks that have multiple test input/output pairs into separate entries.

    Args:
    data (dict): The original dictionary containing tasks with 'test' and 'train' fields.

    Returns:
    tuple: A tuple containing:
        - result (dict): The dictionary with tasks split into separate entries if they have multiple test pairs.
        - split_files (list): A list of keys for the tasks that were split.
    """
    result = {}
    split_files = []
    for key, value in data.items():
        test_list = value.get("test", [])
        train_list = value.get("train", [])
        if len(test_list) > 1:
            for idx, test_item in enumerate(test_list):
                new_key = f"{key}_{idx}"
                result[new_key] = {
                    "test": [test_item],
                    "train": train_list
                }
                split_files.append(new_key)
        else:
            result[key] = value
    return result, split_files


In [None]:
# Compare results to solution
test_run = True

# Prepare data for DataFrame
# Load JSON data from the files
if test_run:
    with open('data/arc-agi_evaluation_challenges.json') as f:
        challenges = json.load(f)
        # Split tasks with multiple test inputs
        challenges, split_files = split_dictionary(challenges) 

    with open('data/arc-agi_evaluation_solutions.json') as f:
        solutions = json.load(f)
else:
    with open('data/arc-agi_test_challenges.json') as f:
        challenges = json.load(f)
    # Split tasks with multiple test inputs
    challenges, split_files = split_dictionary(challenges) 

# Print how many files have been split and their names
split_file_count = len(split_files)//2

print(f"Number of files split: {split_file_count}")
print("File names:")
for name in split_files:
    print(name)

# Prepare data
data = []
        
for file_name, grids in challenges.items():
    train_grids = grids.get('train', [])
    test_inputs = grids.get('test', [])
    if test_run:
        # Handle files with multiple test inputs
        parts = file_name.split('_')
        if len(parts) > 1:
            test_nr = int(parts[1])
        else:
            test_nr = 0
        test_outputs = solutions.get(parts[0], [])
        # Transform test grids to lists of dicts with 'output' key
        test_outputs_transformed = [{'output': test_outputs[test_nr]}]
        # Combine test inputs and outputs in alternating manner
        combined_tests = [{'input': test_inputs[0]['input'], 'output': test_outputs_transformed[0]['output']}]
    data.append({
            'file_name': file_name,
            'train': train_grids,
            'test_input': test_inputs,
            'test_output': test_outputs_transformed if test_run else [[0, 0]],
            'test': combined_tests if test_run else test_inputs
    })

df = pd.DataFrame(data)

Number of files split: 19
File names:
12997ef3_0
12997ef3_1
1d398264_0
1d398264_1
31d5ba1a_0
31d5ba1a_1
3b4c2228_0
3b4c2228_1
4852f2fa_0
4852f2fa_1
4c177718_0
4c177718_1
5d2a5c43_0
5d2a5c43_1
6ea4a07e_0
6ea4a07e_1
8b28cd80_0
8b28cd80_1
9110e3c5_0
9110e3c5_1
9b4c17c4_0
9b4c17c4_1
b1fc8b8e_0
b1fc8b8e_1
bbb1b8b6_0
bbb1b8b6_1
c074846d_0
c074846d_1
d5c634a2_0
d5c634a2_1
da2b0fe3_0
da2b0fe3_1
e21a174a_0
e21a174a_1
e345f17b_0
e345f17b_1
f3e62deb_0
f3e62deb_1


In [4]:
LLAMA_3_CHAT_TEMPLATE = """{% set loop_messages = messages %}{% for message in loop_messages %}{% set content = '<|start_header_id|>' + message['role'] + '<|end_header_id|>\n\n'+ message['content'] | trim + '<|eot_id|>' %}{% if loop.index0 == 0 %}{% set content = bos_token + content %}{% endif %}{{ content }}{% endfor %}{% if add_generation_prompt %}{{ '<|start_header_id|>assistant<|end_header_id|>\n\n' }}{% endif %}"""

# Set the data type for computations to float16, bfloat16 not supported on T4/P100
compute_dtype = getattr(torch, "float16")

# Configure the BitsAndBytes settings for 4-bit quantization to reduce memory usage
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,  # Enable 4-bit quantization
    bnb_4bit_use_double_quant=True,  # Use double quantization for improved precision
    bnb_4bit_quant_type="nf4",  # Specify the quantization type
    bnb_4bit_compute_dtype=compute_dtype,  # Set the computation data type
)

# Specify the model ID for loading the fine-tuned Llama 3 model
model_id = "models/llama3.2_3B/"

# Configure the BitsAndBytes settings for 8-bit quantization to reduce memory usage
# bnb_config = BitsAndBytesConfig(
#     load_in_8bit=True  # Enable 8-bit quantization
# )

# Record the start time to measure the loading duratio
time_start = time()
print("Loading model")

# Load the pre-trained model with specified configurations
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    trust_remote_code=True, # Allow the model to use custom code from the repository
    quantization_config=bnb_config, # Apply the 4-bit or 8-bit quantization configuration
    attn_implementation='sdpa', # Use scaled-dot product attention for better performance
    torch_dtype=compute_dtype, # Set the data type for the model
    use_cache=True, # Disable caching to save memory
    device_map= {"":0}, 
)

# Load the tokenizer associated with the model
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.chat_template = LLAMA_3_CHAT_TEMPLATE # Apply the chat message template

# Record the end time and print the duration for preparing the model and tokenizer
time_end = time()
print(f"Prepare model, tokenizer: {round(time_end-time_start, 3)} sec.")

Loading model


Loading checkpoint shards: 100%|██████████| 3/3 [00:07<00:00,  2.63s/it]


Prepare model, tokenizer: 8.66 sec.


In [5]:
# The system_prompt defines the initial instructions for the model, setting the context for solving ARC tasks.
system_prompt = '''You are a puzzle solving wizard. You are given a puzzle from the abstraction and reasoning corpus developed by Francois Chollet.'''

# User message template is a template for creating user prompts. It includes placeholders for training data and test input data, guiding the model to learn the rule and apply it to solve the given puzzle.
user_message_template = '''Here are the example input and output pairs from which you should learn the underlying rule to later predict the output for the given test input:
----------------------------------------
{training_data}
----------------------------------------
Now, solve the following puzzle based on its input grid by applying the rules you have learned from the training data.:
----------------------------------------
[{{'input': {input_test_data}, 'output': [[]]}}]
----------------------------------------
What is the output grid? Only provide the output grid in the form as in the example input and output pairs. Do not provide any additional information:'''

def preprocess(task, test_run, train_mode=False):
    """
    Preprocess a single ARC task to create the prompt and solution for the model.

    This function formats the system and user messages using a predefined template and the task's training and test data.
    If in training mode, it also includes the assistant's message with the expected output.

    Parameters:
    task (dict): The ARC task data containing training and test examples.
    train_mode (bool): If True, includes the assistant's message with the expected output for training purposes.

    Returns:
    dict: A dictionary containing the formatted text prompt, the solution, and the file name.
    """
    # System message
    system_message = {"role": "system", "content": system_prompt}

    # Extract training data and input grid from the task
    training_data = task['train']
    input_test_data = task['test'][0]['input']
    if test_run:
        output_test_data = task['test'][0]['output']
    else:
        output_test_data = [[0 ,0]]

    # Format the user message with training data and input test data
    user_message_content = user_message_template.format(training_data=training_data, input_test_data=input_test_data)
    user_message = {
        "role": "user",
        "content": user_message_content
    }

    # Include the assistant message with the expected output if in training mode
    if train_mode:
        assistant_message = {
            "role": "assistant",
            "content": str(output_test_data)
        }

        # Combine system, user, and assistant messages
        messages = [system_message, user_message, assistant_message]
    else:
        messages = [system_message, user_message]
    # Convert messages using the chat template for use with the instruction finetuned version of Llama
    messages = tokenizer.apply_chat_template(messages, tokenize=False)
    if test_run:
        return {"text": messages, "solution": output_test_data, "file_name": task['file_name']}
    else:
        return {"text": messages, "file_name": task['file_name']}

# Convert the loaded data to a Huggingface Dataset object
dataset = Dataset.from_pandas(df)

# Apply the preprocess function to each task in the dataset
dataset = dataset.map(lambda x: preprocess(x, test_run), batched=False, remove_columns=dataset.column_names)
dataset = dataset.select(range(0, 419))
print(dataset)

Map: 100%|██████████| 419/419 [00:00<00:00, 1955.72 examples/s]

Dataset({
    features: ['file_name', 'text', 'solution'],
    num_rows: 419
})





In [6]:
# [11, 169, 52, 163, 79, 238, 336, 93, 399, 138, 316, 118, 0, 257, 388, 394, 201, 385, 43, 189, 3, 105, 103]
dataset = dataset.select([11, 169, 52, 163, 79, 238, 336, 93, 399, 138, 316, 118, 0, 257, 388, 394, 201, 385, 43, 189, 3, 105, 103])
print(dataset)

Dataset({
    features: ['file_name', 'text', 'solution'],
    num_rows: 23
})


In [7]:
# Define the maximum number of tokens allowed
max_tokens = 8000  # Adjust this value as needed


# Function to calculate the number of tokens
def count_tokens(text):
    """
    Calculate the number of tokens in a given text using the tokenizer.

    This function uses the tokenizer to encode the input text and returns the
    number of tokens. It is useful for ensuring that the text length stays
    within the model's context window.

    Parameters:
    text (str): The input text to be tokenized.

    Returns:
    int: The number of tokens in the input text.
    """
    return len(tokenizer.encode(text))

# Filter the dataset to include only tasks with a number of tokens within the allowed limit
filtered_dataset = dataset.filter(lambda x: count_tokens(x['text']) <= max_tokens)

# Print the number of tasks filtered out and the remaining tasks
print(f'{len(dataset)-len(filtered_dataset)} tasks contain too many tokens if we set max_tokens to {max_tokens}')
print(f'The dataset contains {len(filtered_dataset)} tasks to evaluate the model')

print(filtered_dataset.to_pandas().columns)

Filter: 100%|██████████| 23/23 [00:00<00:00, 357.66 examples/s]

2 tasks contain too many tokens if we set max_tokens to 8000
The dataset contains 21 tasks to evaluate the model
Index(['file_name', 'text', 'solution'], dtype='object')





In [8]:
# Define your LLM pipeline
text_gen_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    torch_dtype=torch.float16,
    device_map= {"":0}
)
text_gen_pipeline.tokenizer.pad_token_id = text_gen_pipeline.model.config.eos_token_id

# Define terminators for the pipeline
terminators = [
    text_gen_pipeline.tokenizer.eos_token_id,
    text_gen_pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

# Function to generate outputs
def generate_solution(task, max_new_tokens=512, do_sample=True, temperature=0.1, top_p=0.1):
    """
    Generate a solution for an ARC task using the language model.

    This function takes a task prompt, generates a solution using the text generation pipeline,
    and extracts the generated solution from the model's output.

    Parameters:
    task (dict): The ARC task data containing the prompt and other relevant information.
    max_new_tokens (int, optional): The maximum number of new tokens to generate. Default is 512.
    do_sample (bool, optional): Whether to use sampling; if False, greedy decoding is used. Default is True.
    temperature (float, optional): The sampling temperature. Lower values make the model more conservative. Default is 0.1.
    top_p (float, optional): The cumulative probability for nucleus sampling. Lower values make the model more conservative. Default is 0.1.

    Returns:
    dict: A dictionary containing the generated solution.
    """
    # Extract the prompt from the task
    prompt = task['text']
    
    # Generate the model's output based on the prompt
    outputs = text_gen_pipeline(
        prompt, 
        max_new_tokens=max_new_tokens, 
        eos_token_id=terminators, 
        do_sample=do_sample, 
        temperature=temperature, 
        top_p=top_p
    )
    
    # Extract the generated solution from the model's output
    generated_solutions = outputs[0]["generated_text"][len(prompt):]
    return {'generated_solution': generated_solutions}


Device set to use cuda:0


In [9]:
def extract_solution(text):

    if text is None:
        return [[0]]
    
    try:
        # Find the part of the text that looks like a nested list
        start = text.index('[[')
        end = text.index(']]', start) + 2
        array_str = text[start:end]
        
        # Use ast.literal_eval to safely evaluate the string as a Python expression
        array = ast.literal_eval(array_str)
        
        # Check if the result is a list of lists
        if all(isinstance(i, list) for i in array):
            return array
        else:
            return [[0]]
    except (ValueError, SyntaxError):
        return [[0]]

def is_rectangular(array):
    """
    Check if all rows in a 2D list have the same length.
    """
    if not array or not all(isinstance(row, list) for row in array):
        return False
    row_length = len(array[0])
    return all(len(row) == row_length for row in array)

def pad_array_with_value(array, target_shape, pad_value):
    
    padded_array = np.full(target_shape, pad_value, dtype=int)
    original_shape = np.array(array).shape
    padded_array[:original_shape[0], :original_shape[1]] = array
    return padded_array

def compare_solutions_with_padding(generated_output, correct_output, pad_value=-1):
    
    max_rows = max(len(generated_output), len(correct_output))
    max_cols = max(len(generated_output[0]), len(correct_output[0]))
    target_shape = (max_rows, max_cols)
    
    padded_generated = pad_array_with_value(generated_output, target_shape, pad_value)
    padded_correct = pad_array_with_value(correct_output, target_shape, pad_value)
    
    total_pixels = max_rows * max_cols
    correct_pixels = np.sum((padded_generated == padded_correct) & (padded_generated != pad_value) & (padded_correct != pad_value))
    correct_percentage = (correct_pixels / total_pixels) * 100
    
    is_correct = (correct_pixels == total_pixels)
    
    return is_correct, correct_percentage


In [10]:
def generate_solutions_passk(task, k=240, max_new_tokens=512, do_sample=True, temperature=0.7, top_p=0.9, start_idx=0):

    #tasks = [dict(zip(tasks.keys(), values)) for values in zip(*tasks.values())]
    prompt = task['text']
    generated_solutions = []

    
    outputs = text_gen_pipeline(
        prompt,
        max_new_tokens=max_new_tokens,
        eos_token_id=terminators,
        do_sample=do_sample,
        temperature=temperature,
        top_p=top_p,
        num_return_sequences=k
    )
    for output in outputs:
        generated_solution = output["generated_text"][len(prompt):]
        generated_solutions.append(generated_solution)
    
    return {f'generated_solution_{start_idx+i+1}': sol for i, sol in enumerate(generated_solutions)}

In [11]:
def evaluate_passk_solutions(task, k=240):
    
    if not test_run:
        extracted_solutions = []
        for i in range(1, k+1):
            solution_key = f'generated_solution_{i}'
            if solution_key in task:
                gen_solution = extract_solution(task[solution_key])
                extracted_solutions.append(gen_solution)
        
        return {
            'extracted_solutions': extracted_solutions,
            'pass_at_k': False,  # Cannot evaluate without ground truth
            'best_accuracy': 0.0,
            'best_solution_idx': -1
        }
    
    true_solution = task['solution']
    file_name = task['file_name']
    
    best_accuracy = 0.0
    best_solution_idx = -1
    best_solution = None
    is_any_correct = False
    
    solution_results = []
    
    # Evaluate each of the k solutions
    for i in range(1, k+1):
        solution_key = f'generated_solution_{i}'
        if solution_key in task:
            generated_text = task[solution_key]
            gen_solution = extract_solution(generated_text)

            if not is_rectangular(gen_solution) or not is_rectangular(true_solution):
                print(f"Skipping {file_name} due to jagged array.")
                continue

            # Compare with ground truth
            is_correct, correct_percentage = compare_solutions_with_padding(gen_solution, true_solution)
            
            solution_results.append({
                'solution_idx': i,
                'is_correct': is_correct,
                'accuracy': correct_percentage,
                'extracted_solution': gen_solution
            })
            
            # Track the best solution
            if correct_percentage > best_accuracy:
                best_accuracy = correct_percentage
                best_solution_idx = i
                best_solution = gen_solution
            
            # Check if any solution is completely correct
            if is_correct:
                is_any_correct = True
    
    return {
        'file_name': file_name,
        'pass_at_k': is_any_correct,
        'best_accuracy': best_accuracy,
        'best_solution_idx': best_solution_idx,
        'best_solution': best_solution,
        'all_solutions': solution_results,
        'k': len(solution_results)

    }

In [12]:
def run_passk_evaluation(filtered_dataset, k=240):
    print(f"Generating {k} solutions per task for pass@{k} evaluation...")
    
    k_run = 8
    dataset_with_solutions = Dataset.from_dict({})

    for run_idx, start_idx in enumerate(range(0,k, 8), 1):
        print(f"Run {run_idx}/3: Generating solutions {start_idx} to {start_idx + k_run - 1}")
        
        def generation_func(task, k=k_run, start_idx=start_idx):
            return generate_solutions_passk(task, k=k, max_new_tokens=512, do_sample=True, temperature=0.7, top_p=0.9, start_idx=start_idx)
        
        # Generate solutions for the current run
        batch_solutions = filtered_dataset.map(
            lambda x: generation_func(x, k=k_run, start_idx=start_idx),
            batched=False
        )
        
        dataset_with_solutions = concatenate_datasets([dataset_with_solutions,batch_solutions])

    # generation_func = generate_solutions_passk
    # dataset_with_solutions = filtered_dataset.map(
    #     lambda x: generation_func(x, k=k), 
    #     batched=False
    # )
    # print(dataset_with_solutions)
    print("Evaluating solutions...")
    evaluation_results = []
    
    for i, task in enumerate(dataset_with_solutions):
        result = evaluate_passk_solutions(task, k=k)
        evaluation_results.append(result)
        
        if test_run and (i + 1) % 10 == 0:
            print(f"Evaluated {i + 1}/{len(dataset_with_solutions)} tasks")
    
    return evaluation_results, dataset_with_solutions

In [13]:
def analyze_passk_results(evaluation_results, k):
    """
    Analyze and print pass@k evaluation results.

    Parameters:
    evaluation_results (list): List of evaluation results from run_passk_evaluation.
    k (int): The k value used in pass@k evaluation.
    """
    if not test_run:
        print("Cannot analyze results - not in test mode (no ground truth available)")
        return
    
    total_tasks = len(evaluation_results)
    pass_at_k_count = sum(1 for result in evaluation_results if result['pass_at_k'])
    
    # Calculate average best accuracy
    avg_best_accuracy = sum(result['best_accuracy'] for result in evaluation_results) / total_tasks
    
    # Calculate accuracy for each attempt position
    attempt_accuracies = {}
    for i in range(1, k+1):
        accuracies = []
        for result in evaluation_results:
            for sol_result in result['all_solutions']:
                if sol_result['solution_idx'] == i:
                    accuracies.append(sol_result['accuracy'])
        if accuracies:
            attempt_accuracies[f'attempt_{i}'] = sum(accuracies) / len(accuracies)
    
    # Print results
    print(f"\n=== Pass@{k} Evaluation Results ===")
    print(f"Total tasks evaluated: {total_tasks}")
    print(f"Tasks solved with pass@{k}: {pass_at_k_count}")
    print(f"Pass@{k} success rate: {(pass_at_k_count / total_tasks) * 100:.2f}%")
    print(f"Average best accuracy: {avg_best_accuracy:.2f}%")
    
    print(f"\nAccuracy by attempt position:")
    for attempt, accuracy in attempt_accuracies.items():
        print(f"  {attempt}: {accuracy:.2f}%")
    
    # Find tasks where pass@k helped
    helped_tasks = []
    for result in evaluation_results:
        if result['pass_at_k'] and result['best_solution_idx'] > 1:
            helped_tasks.append(result)
    
    if helped_tasks:
        print(f"\nTasks where pass@{k} helped (solution wasn't the first attempt): {len(helped_tasks)}")
        print("Examples:")
        for i, task in enumerate(helped_tasks[:5]):  # Show first 5 examples
            print(f"  {task['file_name']}: Best solution was attempt #{task['best_solution_idx']}")
    

In [1]:
k_value = 240  # k value needs to be changed for possible results
evaluation_results, dataset_with_solutions = run_passk_evaluation(filtered_dataset, k=k_value)

from datetime import datetime

# Get current time
now = datetime.now()

# Format time as a string (e.g., '2025-08-05_14-30-00')
timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")


df_eval = pd.DataFrame(evaluation_results)
print(df_eval)

if not df_eval.empty:
    df_eval['best_solution'] = df_eval['best_solution'].apply(repr)
    df_eval.to_csv(f"output/{timestamp}.csv", index=False)

NameError: name 'run_passk_evaluation' is not defined

In [1]:
l= [11, 169, 52, 163, 79, 238, 336, 93, 399, 138, 316, 118, 0, 257, 388, 394, 201, 385, 43, 189, 3, 105, 103, 188, 376, 357, 276, 383, 354, 275, 233]

def has_duplicates(lst):
    return len(lst) != len(set(lst))
print(has_duplicates(l))

False
