## Importing packages and data

In [410]:
# Importing libraries
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
from   matplotlib import colors
import seaborn as sns

import json
import os
from pathlib import Path

from subprocess import Popen, PIPE, STDOUT
from glob import glob

import langchain # Main LangChain import
from langchain_openai import ChatOpenAI # To work with OpenAI
# from langchain_anthropic import ChatAnthropic # To work with Anthropic (optional)
# from langchain_google_genai import ChatGoogleGenerativeAI # To work with Gemini (optional)
from langchain_core.output_parsers import JsonOutputParser # To help with structured output
from langchain_core.prompts import PromptTemplate # To help create our prompt
from langchain_core.pydantic_v1 import BaseModel, Field # To help with defining what output structure we want
from pydantic import BaseModel

from typing import List, Tuple

# Import Swarm
from swarm import Swarm, Agent
from swarm.repl import run_demo_loop




In [411]:
# Function to load JSON files
def load_json(filepath):
    with open(filepath, 'r') as file:
        return json.load(file)

# Reading files
base_path = 'arc-agi-genesis/data/challenges/'
training_challenges =  load_json(base_path + 'arc-agi_training_challenges.json')
training_solutions =   load_json(base_path + 'arc-agi_training_solutions.json')

evaluation_challenges = load_json(base_path + 'arc-agi_evaluation_challenges.json')
evaluation_solutions = load_json(base_path + 'arc-agi_evaluation_solutions.json')

test_challenges =  load_json(base_path + 'arc-agi_test_challenges.json')

task_sets = {
    'training': {
        'challenges': training_challenges,
        'solutions': training_solutions,
    },
    'evaluation': {
        'challenges': evaluation_challenges,
        'solutions': evaluation_solutions,
    }
}

# Updated function to load tasks from a pre-loaded task set
def load_tasks_from_file(task_set):
    """
    Loads the tasks from the pre-loaded JSON data and returns the challenges and solutions tasks.
    """
    challenges = task_set['challenges']
    solutions = task_set['solutions']

    return challenges, solutions


In [412]:
print(f'Number of training challenges = {len(training_challenges)}')
print(f'Number of solutions of training challenges = {len(training_solutions)}')

Number of training challenges = 400
Number of solutions of training challenges = 400


In [413]:
# Loading tasks from the 'training' task set
challenges, solutions = load_tasks_from_file(task_set=task_sets['training'])
print(challenges['0520fde7'])  # Accessing a specific challenge


{'test': [{'input': [[1, 0, 1, 5, 1, 0, 1], [0, 1, 0, 5, 1, 0, 1], [1, 0, 1, 5, 0, 1, 0]]}], 'train': [{'input': [[1, 0, 0, 5, 0, 1, 0], [0, 1, 0, 5, 1, 1, 1], [1, 0, 0, 5, 0, 0, 0]], 'output': [[0, 0, 0], [0, 2, 0], [0, 0, 0]]}, {'input': [[1, 1, 0, 5, 0, 1, 0], [0, 0, 1, 5, 1, 1, 1], [1, 1, 0, 5, 0, 1, 0]], 'output': [[0, 2, 0], [0, 0, 2], [0, 2, 0]]}, {'input': [[0, 0, 1, 5, 0, 0, 0], [1, 1, 0, 5, 1, 0, 1], [0, 1, 1, 5, 1, 0, 1]], 'output': [[0, 0, 0], [2, 0, 0], [0, 0, 2]]}]}


#### initializing LLM client to use

In [414]:
from dotenv import load_dotenv
import os


# Load environment variables from .env file
load_dotenv('api.env')

# Get the OpenAI API key from environment variables
openai_api_key = os.getenv('OPENAI_API_KEY')

# Initialize the ChatOpenAI model with the API key
llm = ChatOpenAI(model='gpt-4o-mini', openai_api_key=openai_api_key, max_tokens=3000)

# Initialize the Swarm client
client = Swarm()

## And incase you want to try Anthropic
# llm = ChatAnthropic(model='claude-3-5-sonnet-20240620', api_key=UserSecretsClient().get_secret("ANTHROPIC_API_KEY"), max_tokens=3000)
# llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", google_api_key=UserSecretsClient().get_secret("GOOGLE_API_KEY"), max_tokens=3000)

## Trying to make MVP product which is just regular openai model trying to predict 

In [415]:
# converting train and test pairs to a string format ideal for LLMs
def json_task_to_string(challenge_tasks: dict, task_id: str, test_input_index: int) -> str:
    """
    challenge_tasks: dict a list of tasks
    task_id: str the id of the task we want to convert to a string
    
    Convert your json task into a string so you can pass it to your LLM.
    This is a crucial step where you can use your creativity to edit how tasks are represented.
    """
    json_task = challenge_tasks[task_id]

    final_output = ""

    train_tasks = json_task['train']
    test_task = json_task['test']

    final_output = "Training Examples\n"

    for i, task in enumerate(train_tasks):
        final_output += f"Example {i + 1}: Input\n["
        for row in task['input']:
            final_output += f"\n{str(row)},"

        final_output += "]\n\n"
        final_output += f"Example {i + 1}: Output\n["

        for row in task['output']:
            final_output += f"\n{str(row)},"

        final_output += "]\n\n"

    final_output += "Test\n["
    for row in test_task[test_input_index]['input']:
        final_output += f"\n{str(row)}"

    final_output += "]\n\nYour Response:"

    return final_output

In [416]:
# an example of how the function works
task_string = json_task_to_string(challenges, '0520fde7', 0)
print (task_string)

Training Examples
Example 1: Input
[
[1, 0, 0, 5, 0, 1, 0],
[0, 1, 0, 5, 1, 1, 1],
[1, 0, 0, 5, 0, 0, 0],]

Example 1: Output
[
[0, 0, 0],
[0, 2, 0],
[0, 0, 0],]

Example 2: Input
[
[1, 1, 0, 5, 0, 1, 0],
[0, 0, 1, 5, 1, 1, 1],
[1, 1, 0, 5, 0, 1, 0],]

Example 2: Output
[
[0, 2, 0],
[0, 0, 2],
[0, 2, 0],]

Example 3: Input
[
[0, 0, 1, 5, 0, 0, 0],
[1, 1, 0, 5, 1, 0, 1],
[0, 1, 1, 5, 1, 0, 1],]

Example 3: Output
[
[0, 0, 0],
[2, 0, 0],
[0, 0, 2],]

Test
[
[1, 0, 1, 5, 1, 0, 1]
[0, 1, 0, 5, 1, 0, 1]
[1, 0, 1, 5, 0, 1, 0]]

Your Response:


In [417]:
# Using a json output parser to parse the output, since LLMs aren't perfect at generating valid json
# Defining a prediction as a list of lists
class ARCPrediction(BaseModel):
    prediction: List[List] = Field(..., description="A prediction for a task")

In [418]:
# Define Agents

def provide_pattern_description(pattern_description: str, context_variables: dict):
    """
    Function to pass the identified pattern description to the Code Generation Agent.
    """
    context_variables['pattern_description'] = pattern_description  # Store the identified pattern
    return code_generation_agent

def provide_python_code(python_code: str, context_variables: dict):
    """
    Function to pass the generated Python code to the Code Revision Agent.
    """
    context_variables['python_code'] = python_code  # Store the generated Python code
    return code_revision_agent

def review_python_code(revised_code: str = None, context_variables: dict = None):
    """
    Function to pass the revised Python code to the Execution Agent.
    """
    if revised_code:
        context_variables['python_code'] = revised_code  # Update the context with revised code
    return execution_agent

def execute_python_code(context_variables: dict = None):
    """
    Function to execute the Python code and produce the output grid.
    """
    if context_variables is None:
        context_variables = {}

    python_code = context_variables.get('python_code')  # Retrieve the generated Python code
    input_grid = context_variables.get('input_grid')  # Retrieve the input grid

    if python_code is None:
        print("Error: No Python code provided in context_variables.")
        return

    # Ensure the Python code defines a `transform` function before executing
    if 'def transform' not in python_code:
        print("Error: The provided Python code does not define a `transform` function.")
        raise ValueError("The provided Python code does not define a `transform` function.")

    try:
        exec_globals = {}
        exec(python_code, exec_globals)  # Execute the provided Python code
        if 'transform' in exec_globals:
            output_grid = exec_globals['transform'](input_grid)
            context_variables['output_grid'] = output_grid
            return output_grid  # Return the output grid
        else:
            print("Error: The code does not define a 'transform' function.")
            raise ValueError("The code does not define a 'transform' function.")
    except Exception as e:
        print(f"Error during execution: {e}")
        raise



# Pattern Recognition Agent
pattern_recognition_agent = Agent(
    name="Pattern Recognition Agent",
    instructions=(
        "You are an expert in recognizing patterns in data. "
        "Given training examples of input and output grids, analyze the inputs and outputs and identify the underlying pattern or transformation. "
        "Describe the pattern clearly and concisely."
    ),
    functions=[provide_pattern_description]
)

# Code Generation Agent
code_generation_agent = Agent(
    name="Code Generation Agent",
    instructions=(
        "You are a coding assistant. "
        "Given a description of a pattern or transformation, write Python code that implements the transformation. "
        "The code should define a function 'transform(input_grid)' that returns the output grid as per the pattern."
    ),
    functions=[provide_python_code]
)

# Code Revision Agent
code_revision_agent = Agent(
    name="Code Revision Agent",
    instructions=(
        "You are a code reviewer. "
        "Given Python code, check it for correctness and suggest improvements if necessary. "
        "If the code is correct, confirm it. If you make revisions, provide the revised code."
    ),
    functions=[review_python_code]
)

# Execution Agent
execution_agent = Agent(
    name="Execution Agent",
    instructions=(
        "You are an execution agent. "
        "Given Python code and an input grid, execute the code to produce the output grid. "
        "The code defines a function 'transform(input_grid)' that you can call. "
        "Provide the output grid as a JSON response."
    ),
    functions=[execute_python_code]
)


### Creating function to get task prediction, make prompt, make API calls to model and parse output with retries

In [419]:
def get_task_prediction(challenge_tasks, task_id, test_input_index) -> List[List]:
    """
    Given a task, predict the test output using Swarm agents.
    """
    # Get the string representation of the task
    task_string = json_task_to_string(challenge_tasks, task_id, test_input_index)
    
    # Extract the test input grid
    test_input_grid = challenge_tasks[task_id]['test'][test_input_index]['input']

    # Define context variables
    context_variables = {
        'task_string': task_string,
        'input_grid': test_input_grid
    }

    # Start the conversation with the Pattern Recognition Agent
    response = client.run(
        agent=pattern_recognition_agent,
        messages=[{"role": "user", "content": task_string}],
        context_variables=context_variables,
        execute_tools=True,
        max_turns=10,
        debug=False
    )

    # Retrieve the output grid from context variables
    output_grid = response.context_variables.get('output_grid', None)

    if output_grid is None:
        print("No output grid generated.")
        raise ValueError("No output grid generated.")

    prediction = output_grid

    # Validate the prediction
    if not all(isinstance(sublist, list) and all(isinstance(item, int) for item in sublist) for sublist in prediction):
        print("Warning: Output must be a list of lists of integers.")
        print(f"Errored Output: {prediction}")
        raise ValueError("Output must be a list of lists of integers.")
    
    # Output the shape of the prediction
    num_rows = len(prediction)
    num_cols = len(prediction[0]) if num_rows > 0 else 0
    print(f"    Prediction Grid Size: {num_rows}x{num_cols}\n")

    return prediction


In [420]:
def run_model(challenges, NUM_ATTEMPTS=2, RETRY_ATTEMPTS=3, NUM_TASKS=None):
    """
    challenges: dict a list of challenges. This should come directly from your _challenges file
    NUM_ATTEMPTS: int the number of times to attempt a prediction. The official competition has 2 attempts.
    RETRY_ATTEMPTS: int the number of times to retry a prediction if it fails
    NUM_TASKS: int, If set, this represents the the number of tasks you'd like to test. If None then the all challeneges will be tested

    Loop through your challenges and produce a submission.json file you can submit for a score.
    """

    # A dict to hold your submissions that you'll return after all predictions are made
    submission = {}

    # Run through each task in your challenge set
    for i, task_id in enumerate(challenges):
        task_attempts = []  # List to store all attempts for the current task

        # Go through each test pair to get a prediction. 96% of challenges have 1 pair.
        for t, pair in enumerate(challenges[task_id]['test']):
            print(f"Starting task #{i + 1} ({task_id}), pair #{t+1}")

            # Dictionary to store attempts for the current test pair
            pair_attempts = {}  

            # Run through each prediction attempt
            for attempt in range(1, NUM_ATTEMPTS + 1):
                attempt_key = f"attempt_{attempt}"
                pair_attempts[attempt_key] = [] # Init your attempt

                # Try to get a prediction, with retries in case of failure
                for retry in range(RETRY_ATTEMPTS):
                    try:
                        print(f"    Predicting attempt #{attempt}, retry #{retry + 1}")
                        prediction = get_task_prediction(challenge_tasks=challenges,
                                                         task_id=task_id,
                                                         test_input_index=t)
                        
                        # If you get a valid prediction (list of lists of ints) with no error, then log the attempt
                        pair_attempts[attempt_key] = prediction
                        break  # Break the retry loop if prediction is successful
                    except Exception as e:
                        print(f"Retrying: {e}")
                        if retry == RETRY_ATTEMPTS - 1:
                            pair_attempts[attempt_key] = []  # Assign None if all retries fail

            # After you get your attempts, append them to the task attempts
            task_attempts.append(pair_attempts)

        # Append the task attempts to the submission with the task_id as the key
        submission[task_id] = task_attempts

        # If you want to stop after N tasks, uncomment the below
        if NUM_TASKS is not None and i + 1 == NUM_TASKS:
            break

    return submission

### Creating submission files and comparing it with solutions file

In [421]:
# create submission file
def create_submission_file(submission, file_name='submission.json'):
    """
    Save a submission file to the specified file name
    """
    with open(file_name, "w") as file:
        json.dump(submission, file)

    print (f"Submission saved to {file_name}")

In [422]:
# create function to compare submission with solutions
def score_submission(submission_file_name, solutions) -> Tuple[float, int]:
    """
    submission_file_name: str, the file name of your submission file
    solutions: dict, the ground truth solutions you'd like to test against
    
    Read a submission from file, score it, then return the score
    """
    print(f"Scoring {submission_file_name}\n")

    # Open your submission file
    with open(submission_file_name, "r") as file:
        submission = json.load(file)

    total_score = 0
    total_tasks = 0

    # Loop through each task in your submission to grade it
    for task_id, task_submission in submission.items():
        total_tasks += 1
        task_score = 0
        num_pairs = len(task_submission)

        # Ensure the task exists in the solutions
        if task_id not in solutions:
            print(f"Warning: Task {task_id} not found in solutions. Skipping.")
            continue

        # Ensure the number of pairs matches
        if num_pairs > len(solutions[task_id]):
            print(f"Warning: More pairs in submission than in solutions for task {task_id}. Adjusting to match.")
            num_pairs = len(solutions[task_id])

        # Go through each task. Most will only have 1 pair
        for pair_index, pair_attempts in enumerate(task_submission[:num_pairs]):
            print(f"Scoring Task {task_id} pair #{pair_index + 1}")
            pair_correct = False

            # Look at both of your attempts (pair_attempts is a list, so iterate directly)
            for attempt in pair_attempts:
                
                # Check if the attempt matches the solution
                if attempt == solutions[task_id][pair_index]:
                    print(f"Task Id {task_id} pair {pair_index + 1} attempt matches solution")
                    pair_correct = True
                    break  # If it is correct, log it and break the loop

            if pair_correct:
                task_score += 1

        task_score /= num_pairs
        total_score += task_score

    return {
        'total_score': total_score,
        'total_tasks_scored': total_tasks
    }


#### The main function to bring everything together

In [423]:
def main(task_set='training', NUM_TASKS=None, NUM_ATTEMPTS=2, RETRY_ATTEMPTS=3, submission_file_name='submission.json'):
    print(f"Starting task set: {task_set}, with {NUM_TASKS if NUM_TASKS else 'all'} tasks.")

    # Load datasets
    print("Loading tasks and solutions...")
    challenges, solutions = load_tasks_from_file(task_set=task_sets[task_set])
    
    # Print the type and sample of the challenges object for debugging
    print(f"Type of challenges: {type(challenges)}")
    sample_key = next(iter(challenges))
    print(f"Sample of challenges: {sample_key} => {challenges[sample_key]}")

    # Initialize Swarm agents (if not already initialized globally)
    global client, pattern_recognition_agent, code_generation_agent, code_revision_agent, execution_agent
    print("Initializing agents...")

    # Prepare the submission dictionary
    submission = {}

    # Run the model
    print("Running the model on the challenges...")

    # Iterate over the challenge keys and limit the number of tasks using NUM_TASKS
    for idx, task_id in enumerate(list(challenges.keys())[:NUM_TASKS]):
        print(f"Starting task #{idx + 1}, challenge ID {task_id}")

        challenge = challenges[task_id]

        # Go through each test pair to get a prediction
        for t, pair in enumerate(challenge['test']):
            print(f"Starting test pair #{t+1}")

            task_attempts = []  # List to store all attempts for the current test pair

            for attempt in range(1, NUM_ATTEMPTS + 1):
                print(f"  Attempt {attempt} for task #{idx + 1}, test pair #{t+1}")

                # Initialize the task string and input grid
                test_input_grid = challenge['test'][t]['input']
                context_variables = {'input_grid': test_input_grid}

                success = False
                for retry in range(RETRY_ATTEMPTS):
                    try:
                        # Step 1: Pattern Recognition Agent
                        print("Pattern Recognition Agent analyzing patterns...")
                        response = client.run(
                            agent=pattern_recognition_agent,
                            messages=[{"role": "user", "content": str(challenge)}],
                            context_variables=context_variables,
                            execute_tools=True
                        )
                        context_variables = response.context_variables
                        pattern_description = response.messages[-1]["content"]
                        print(f"Pattern recognized: {pattern_description}")

                        # Step 2: Code Generation Agent
                        print("Code Generation Agent generating Python code...")
                        response = client.run(
                            agent=code_generation_agent,
                            messages=[{"role": "user", "content": pattern_description}],
                            context_variables=context_variables,
                            execute_tools=True
                        )
                        context_variables = response.context_variables
                        python_code = response.messages[-1]["content"]
                        print(f"Generated Python code:\n{python_code}")

                        # Step 3: Code Revision Agent
                        print("Code Revision Agent reviewing generated code...")
                        response = client.run(
                            agent=code_revision_agent,
                            messages=[{"role": "user", "content": python_code}],
                            context_variables=context_variables,
                            execute_tools=True
                        )
                        context_variables = response.context_variables
                        revised_code = response.messages[-1]["content"]
                        print(f"Revised Python code:\n{revised_code}")

                        # Step 4: Execution Agent
                        print("Execution Agent executing the Python code...")
                        response = client.run(
                            agent=execution_agent,
                            messages=[{"role": "user", "content": revised_code}],
                            context_variables=context_variables,
                            execute_tools=True
                        )
                        context_variables = response.context_variables
                        output_grid = context_variables.get('output_grid')

                        if output_grid:
                            print(f"Execution output grid (Generated by Agents):\n{output_grid}")
                            success = True
                            task_attempts.append(output_grid)
                            break  # Break the retry loop if successful
                        else:
                            print("No output grid generated.")
                    except Exception as e:
                        print(f"Error during execution: {e}")
                        if retry == RETRY_ATTEMPTS - 1:
                            print(f"All retries failed for attempt {attempt} of task #{idx + 1}")
                            task_attempts.append([])  # Append empty grid if all retries fail

                if success:
                    print(f"Successfully generated output for task #{idx + 1}, test pair #{t+1}, attempt #{attempt}")
                else:
                    print(f"Failed to generate output for task #{idx + 1}, test pair #{t+1}, attempt #{attempt}")

            # Append the attempts for this pair to the submission
            submission[task_id] = task_attempts

        # If you want to stop after N tasks, uncomment the below
        if NUM_TASKS is not None and idx + 1 == NUM_TASKS:
            break

    # Create the submission file
    print(f"Creating submission file: {submission_file_name}...")
    create_submission_file(submission, file_name=submission_file_name)
    print(f"Submission file '{submission_file_name}' created.")

    # Score the submission
    print("Scoring the submission...")
    score_result = score_submission(solutions=solutions, submission_file_name=submission_file_name)
    
    total_score = score_result['total_score']
    total_tasks_scored = score_result['total_tasks_scored']
    score_percentage = round(total_score / total_tasks_scored * 100, 2)

    print(f"Final score: {total_score} of {total_tasks_scored} ({score_percentage}%)")

    print("Process completed.")


# RUNNING THE MODEL

In [424]:
main(task_set='evaluation', NUM_TASKS=2)

Starting task set: evaluation, with 2 tasks.
Loading tasks and solutions...
Type of challenges: <class 'dict'>
Sample of challenges: 00576224 => {'test': [{'input': [[3, 2], [7, 8]]}], 'train': [{'input': [[8, 6], [6, 4]], 'output': [[8, 6, 8, 6, 8, 6], [6, 4, 6, 4, 6, 4], [6, 8, 6, 8, 6, 8], [4, 6, 4, 6, 4, 6], [8, 6, 8, 6, 8, 6], [6, 4, 6, 4, 6, 4]]}, {'input': [[7, 9], [4, 3]], 'output': [[7, 9, 7, 9, 7, 9], [4, 3, 4, 3, 4, 3], [9, 7, 9, 7, 9, 7], [3, 4, 3, 4, 3, 4], [7, 9, 7, 9, 7, 9], [4, 3, 4, 3, 4, 3]]}]}
Initializing agents...
Running the model on the challenges...
Starting task #1, challenge ID 00576224
Starting test pair #1
  Attempt 1 for task #1, test pair #1
Pattern Recognition Agent analyzing patterns...
Pattern recognized: The output grid is:

```json
[
    [3, 2, 3, 2, 3, 2],
    [7, 8, 7, 8, 7, 8],
    [2, 3, 2, 3, 2, 3],
    [8, 7, 8, 7, 8, 7],
    [3, 2, 3, 2, 3, 2],
    [7, 8, 7, 8, 7, 8]
]
```
Code Generation Agent generating Python code...
Generated Python code:
T