<a href="https://colab.research.google.com/github/JackKeller27/ARC-Prize-2024/blob/main/arc_submission_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ARC Prize 2024 Submission

Jack Keller

Georgia Institute of Technology

jkeller44@gatech.edu

# Load needed libraries

Basic libraries like numpy, torch, matplotlib, and tqdm are already installed.

In [None]:
import json
import numpy as np
from tqdm import tqdm

# # Import arckit library
# # Repo: https://github.com/mxbi/arckit?tab=readme-ov-file
# # Paper: https://arxiv.org/pdf/2402.03507
# import arckit

colab = False

# Load the data

Here we are loading the training challenges and solutions (this is the public training set), the evaluation challenges and solutions (this is the public evaluation set), and the test challenges (currently a placeholder file that is a copy of the public evaluation challanges).

In [None]:
# Uncomment this cell to run with colab
colab = True

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Public training set
if colab:
  train_challenges_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_training_challenges.json'
  train_solutions_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_training_solutions.json'
else:
  train_challenges_path = './input/arc-prize-2024/arc-agi_training_challenges.json'
  train_solutions_path = './input/arc-prize-2024/arc-agi_training_solutions.json'

with open(train_challenges_path) as fp:
    train_challenges = json.load(fp)
with open(train_solutions_path) as fp:
    train_solutions = json.load(fp)

# Public evaluation set
if colab:
  evaluation_challenges_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_training_challenges.json'
  evaluation_solutions_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_training_solutions.json'
else:
  evaluation_challenges_path = './input/arc-prize-2024/arc-agi_training_challenges.json'
  evaluation_solutions_path = './input/arc-prize-2024/arc-agi_training_solutions.json'

with open(evaluation_challenges_path) as fp:
    evaluation_challenges = json.load(fp)
with open(evaluation_solutions_path) as fp:
    evaluation_solutions = json.load(fp)

# This will be the hidden test challenges (currently has a placeholder to the evaluation set)
if colab:
  test_challenges_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_test_challenges.json'
else:
  test_challenges_path = './input/arc-prize-2024/arc-agi_test_challenges.json'

with open(test_challenges_path) as fp:
    test_challenges = json.load(fp)

Here is an example of what a test task looks like:

In [None]:
sample_task = list(test_challenges.keys())[0]
test_challenges[sample_task]

{'test': [{'input': [[3, 2], [7, 8]]}],
 'train': [{'input': [[8, 6], [6, 4]],
   'output': [[8, 6, 8, 6, 8, 6],
    [6, 4, 6, 4, 6, 4],
    [6, 8, 6, 8, 6, 8],
    [4, 6, 4, 6, 4, 6],
    [8, 6, 8, 6, 8, 6],
    [6, 4, 6, 4, 6, 4]]},
  {'input': [[7, 9], [4, 3]],
   'output': [[7, 9, 7, 9, 7, 9],
    [4, 3, 4, 3, 4, 3],
    [9, 7, 9, 7, 9, 7],
    [3, 4, 3, 4, 3, 4],
    [7, 9, 7, 9, 7, 9],
    [4, 3, 4, 3, 4, 3]]}]}

# Implementation

## Define our DSL

Create a class which represents an individual ARC puzzle.

Will contain attributes about the puzzle (size, components, etc.) along with various operations which may be applied to the puzzle to transform it.
These operations comprise our domain-specific language.

In [None]:
"""
Adapted from Knowledge-Based AI (CS 4635/7637) Homework 2 at Georgia Institute of Technology.

Defines a class to represent an ARC Puzzle, and a list of possible operations that may be applied to the grid to transform it.
This represents our domain-specific language (DSL).

Instructor: Christopher J. MacLellan. https://chrismaclellan.com/
"""

from random import choice

class ARCPuzzle():
    """
    An ARC puzzle class that can be used to test different search algorithms.
    When first created, the puzzle is in the solved state.

    Contains basic, full-puzzle operations (rotate, mirror, compress, etc.) along with component-specific operations.
    """

    def __init__(self, input_grid):
        self.state = tuple((tuple(row) for row in input_grid))
        self.path = []
        # super().__init__(self.state, None)

    # def __add_goal__(self, output_grid):
    #     # Add goal state to the current puzzle (in the form of another ARCPuzzle instance)
    #     self.goal_node = Node(ARCPuzzle(output_grid))

    def __hash__(self):
        return hash(self.state)

    def __eq__(self, other):
        if isinstance(other, ARCPuzzle):
            return self.state == other.state
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def __repr__(self):
        return str(self)

    def __str__(self):
        out = ""
        for row in self.state:
            out += str(row) + "\n"
        return out

    def copy(self):
        """
        Makes a deep copy of an ARCPuzzle object.
        """
        new_puzzle = ARCPuzzle(self.state)
        new_puzzle.path = self.path.copy()
        return new_puzzle

    def randomize(self, num_shuffles):
        """
        Randomizes an ARCPuzzle by executing a random action `num_suffles`
        times.
        """
        for i in range(num_shuffles):
            actions = [a for a in self.legalActions()]
            a = choice([a for a in self.legalActions()])
            # print(actions, a)
            self.executeAction(a)

        return self

    def tophalf(self, grid):
        """ upper half """
        return grid[:len(grid) // 2]


    def rot90(self, grid):
        """ clockwise rotation by 90 degrees """
        return tuple(zip(*grid[::-1]))


    def hmirror(self, grid):
        """ mirroring along horizontal """
        return grid[::-1]

    def vmirror(self, grid):
        """ mirroring along horizontal """
        return tuple(reversed(grid))

    def lshift(self, grid):
        return tuple([tuple([e for e in row if e != 0] + [0]*row.count(0))
                      for row in grid])

    def compress(self, grid):
        """ removes frontiers """
        ri = [i for i, r in enumerate(grid) if len(set(r)) == 1]
        ci = [j for j, c in enumerate(zip(*grid)) if len(set(c)) == 1]
        return tuple([tuple([v for j, v in enumerate(r) if j not in ci])
                      for i, r in enumerate(grid) if i not in ri])

    def mapcolor(self, grid, a, b):
        return tuple(tuple(b if e == a else e for e in row) for row in grid)

    def trim(self, grid):
        """ removes border """
        return tuple(r[1:-1] for r in grid[1:-1])

    def executeAction(self, action):
        """
        Executes an action to the ARCPuzzle object.

        :param action: the action to execute
        :type action: "up", "left", "right", or "down"
        """
        if action == 'tophalf':
            self.state = self.tophalf(self.state)
        elif action == 'rot90':
            self.state = self.rot90(self.state)
        elif action == 'hmirror':
            self.state = self.hmirror(self.state)
        elif action == 'vmirror':
            self.state = self.vmirror(self.state)
        elif action == 'lshift':
            self.state = self.lshift(self.state)
        elif action == 'compress':
            self.state = self.compress(self.state)
        elif action[:8] == 'mapcolor':
            args = action[9:-1].split(',')
            self.state = self.mapcolor(self.state, int(args[0]), int(args[1]))

    def legalActions(self):
        """
        Returns an iterator to the legal actions that can be executed in the
        current state.
        """
        for action in ['tophalf', 'rot90', 'hmirror', 'vmirror', 'lshift', 'compress']:
            yield action

        for a in set(e for row in self.state for e in row):
            for b in range(10):
                if a == b:
                    continue
                yield f'mapcolor({a},{b})'


## Define our search algorithms

Let's start with a basic breadth-first-search.

**Best score:** 0%

In [None]:
# We need this for imposing a time limit on the search
import time

In [None]:
def breadth_first_search(input_grid, output_grid, max_depth, max_seconds) -> list[str]:
    """
    Searches for a sequence of transformations to convert the input_grid into
    the output_grid.

    Parameters:
    - input_grid: The initial grid that needs to be transformed.
    - output_grid: The target grid to achieve.
    - max_depth: The max depth of the search.
    - max_seconds: The max number of seconds allowed before the search times out.

    Returns:
    - A list of actions that transforms input_grid into output_grid.
    """
    # Create initial problem puzzle and goal puzzle
    start_problem = ARCPuzzle(input_grid)
    goal_problem = ARCPuzzle(output_grid)

    # Set up BFS
    queue = []
    visited = set([start_problem.state])
    init_time = time.time()

    queue.append(start_problem)
    while (len(queue) > 0):
        # Break if we've hit the time limit
        if time.time() - init_time > max_seconds:
            # print(print(time.time() - init_time))

            # Returning None signifies timeout
            return None

        curr_prob = queue.pop(0)
        # print(curr_prob.path)

        if curr_prob == goal_problem:
            return curr_prob.path

        if len(curr_prob.path) > max_depth:
            # we've applied more than the max_depth number of transformations
            # move on to next possibility
            continue

        visited.add(curr_prob.state)

        # Explore frontier of possible transformations
        legal_actions = curr_prob.legalActions()
        for action in legal_actions:
            # print(action)
            new_prob = curr_prob.copy()
            new_prob.path.append(action)
            # if len(new_prob.path) > 1:
            #     print(new_prob.path)
            new_prob.executeAction(action)

            # print(new_prob)
            # print(new_prob.path)

            if new_prob.state not in visited:
                queue.append(new_prob)

    # Return empty if no solution found
    return []

Next let's implement best-first-search using a heuristic. This should help reduce our search time and avoid timing out.

The heuristic is defined below:
- Select the operation that minimizes the total number of differing pixels
between the current state and the output state.
- Each pixel that differs between the current and output grids counts as
one difference, regardless of how much the actual values differ.

**Best score**: _%

In [None]:
# from queue import PriorityQueue

# We'll use heapq since it's more efficient
import heapq
from itertools import count

In [None]:
def best_first_search(input_grid, output_grid, max_depth, max_seconds) -> list[str]:
    """
    Searches for a sequence of transformations to convert the input_grid into
    the output_grid.

    Relies on a heuristic: select the action which minimizes the number of different pixels between the current state and goal state.

    Parameters:
    - input_grid: The initial grid that needs to be transformed.
    - output_grid: The target grid to achieve.
    - max_depth: The max depth of the search.
    - max_seconds: The max number of seconds allowed before the search times out.

    Returns:
    - A list of actions that transforms input_grid into output_grid.
    """
    # Create initial problem puzzle and goal puzzle
    start_problem = ARCPuzzle(input_grid)
    goal_problem = ARCPuzzle(output_grid)

    # Set up the search
    priority_queue = [] # elements are of form (# of different pixels from goal, current state)
    visited = set([start_problem.state])
    init_time = time.time()
    counter = count() # unique counter for each entry in the priority queue (in case heuristic's are tied)

    heapq.heappush(priority_queue, (0, next(counter), start_problem))
    while (len(priority_queue) > 0):
        # Break if we've hit the time limit
        if time.time() - init_time > max_seconds:
            # print(print(time.time() - init_time))

            # Returning None signifies timeout
            return None

        _, _, curr_prob = heapq.heappop(priority_queue)
        # print(curr_prob.path)

        if curr_prob == goal_problem:
            # Solution found
            return curr_prob.path

        if len(curr_prob.path) > max_depth:
            # we've applied more than the max_depth number of transformations
            # move on to next possibility
            continue

        visited.add(curr_prob.state)

        # Explore frontier of possible transformations
        legal_actions = curr_prob.legalActions()
        for action in legal_actions:
            # print(action)
            new_prob = curr_prob.copy()
            new_prob.path.append(action)
            # if len(new_prob.path) > 1:
            #     print(new_prob.path)
            new_prob.executeAction(action)

            # print(new_prob)
            # print(new_prob.path)

            if new_prob.state not in visited:
                # Calculate heuristic (# of different pixels)
                h = np.sum(new_prob.state != goal_problem.state)
                heapq.heappush(priority_queue, (h, next(counter), new_prob))

    # Return empty if no solution found
    return []

Finally, we'll implement a bidirectional version of our best-first-search above. Hopefully, this will reduce search time significantly.

**Best score:** _%

In [None]:
def bidirectional_best_first_search(input_grid, output_grid, max_depth, max_seconds) -> list[str]:
    """
    Searches for a sequence of transformations to convert the input_grid into
    the output_grid using a bidirectional heuristic approach.

    Parameters:
    - input_grid: The initial grid that needs to be transformed.
    - output_grid: The target grid to achieve.
    - max_depth: The max depth of the search.
    - max_seconds: The max number of seconds allowed before the search times out.

    Returns:
    - A list of actions that transforms input_grid into output_grid, or None if it times out.
    """
    # Initialize problems from both start and goal states
    start_problem = ARCPuzzle(input_grid)
    goal_problem = ARCPuzzle(output_grid)

    # Initialize priority queues and visited sets for both directions
    start_queue = []
    goal_queue = []
    counter = count() # unique counter for each entry in the priority queue (in case heuristic's are tied)
    heapq.heappush(start_queue, (0, next(counter), start_problem))
    heapq.heappush(goal_queue, (0, next(counter), goal_problem))

    # Here, we need to keep track of both states and paths in the visited set, since we'll need to stitch the start and goal paths together
    start_visited = {start_problem.state: start_problem.path}
    goal_visited = {goal_problem.state: goal_problem.path}

    # Keep track of initial time to check if the search times out
    init_time = time.time()

    while start_queue and goal_queue:
        # Break if time limit is reached
        if time.time() - init_time > max_seconds:
            return None  # Timeout

        # Expand from the start side
        _, _, curr_start_prob = heapq.heappop(start_queue)

        if curr_start_prob.state in goal_visited:
            # Paths meet; combine and return
            return curr_start_prob.path + goal_visited[curr_start_prob.state][::-1]

        # Check depth limit for start side
        if len(curr_start_prob.path) <= max_depth:

            # This is where we iterate through the possible operations in the DSL
            for action in curr_start_prob.legalActions():
                new_start_prob = curr_start_prob.copy()
                new_start_prob.path.append(action)
                new_start_prob.executeAction(action)

                # If the state is unvisited, compute heuristic and add to queue
                if new_start_prob.state not in start_visited:
                    h_start = np.sum(new_start_prob.state != goal_problem.state)  # heuristic
                    heapq.heappush(start_queue, (h_start, next(counter), new_start_prob))
                    start_visited[new_start_prob.state] = new_start_prob.path # add to visited set

        # Expand from the goal side
        _, _, curr_goal_prob = heapq.heappop(goal_queue)

        if curr_goal_prob.state in start_visited:
            # Paths meet; combine and return
            return start_visited[curr_goal_prob.state] + curr_goal_prob.path[::-1]

        # Check depth limit for goal side
        if len(curr_goal_prob.path) <= max_depth:
            for action in curr_goal_prob.legalActions():
                new_goal_prob = curr_goal_prob.copy()
                new_goal_prob.path.append(action)
                new_goal_prob.executeAction(action)

                # If the state is unvisited, compute heuristic and add to queue
                if new_goal_prob.state not in goal_visited:
                    h_goal = np.sum(new_goal_prob.state != start_problem.state)  # heuristic
                    heapq.heappush(goal_queue, (h_goal, next(counter), new_goal_prob))
                    goal_visited[new_goal_prob.state] = new_goal_prob.path # add to visited set

    # Return empty if no solution found
    return []


## Solve the ARC tasks (generate hypotheses)

This function takes the train and test inputs for a singular ARC task and predicts an answer using search.

Possible search algorithms:
- Breadth-first-search
- Best-first-search (# of different pixels heuristic)
- *Bidirectional* best-first-search (default)

### Analyze the input and output grids
In an attempt to better understand the search problems, and possibly narrow down the search space of possible transformations, we must:
- Determine the size of the output grid
- Detect components in each grid and common components in both
- Use this information to narrow down the list of possible (or likely) DSL operations

In this section, helper functions will be implemented to accomplish each of these tasks.

In [None]:
"""
Determine the size of the output grid for a given ARC problem.

Calculated by the input to output size ratio.
"""

def determine_output_grid_size(train_inputs, train_outputs, test_input):
  # For every ARC problem, the ratio of input size:output size is consistent throughout the training examples
  # We can use this to predict the test output size
  for input_grid, output_grid in zip(train_inputs, train_outputs):
    input_grid = np.array(input_grid)
    output_grid = np.array(output_grid)

    y_ratio = input_grid.shape[0] / output_grid.shape[0]
    x_ratio = input_grid.shape[1] / output_grid.shape[1]

    output_shape = (int(test_input.shape[0] * y_ratio), int(test_input.shape[1] * x_ratio))
    return output_shape


In [None]:
"""
Given pairs of training inputs and outputs, and the test input, predict the output.
"""

def make_prediction(train_inputs, train_outputs, test_input, search=bidirectional_best_first_search, max_search_depth=3, max_search_seconds=5):
    hypothesis = None
    secondary_hypothesis = None
    timed_out = False

    # print(train_inputs)
    # print(train_outputs)

    for input_grid, output_grid in zip(train_inputs, train_outputs):
        if hypothesis is None:
            # Generate initial hypothesis
            hypothesis = search(input_grid, output_grid, max_depth=max_search_depth, max_seconds=max_search_seconds)
            # print(hypothesis)
        else:
            # Test exisiting hypothesis
            input_puzzle = ARCPuzzle(input_grid)
            output_puzzle = ARCPuzzle(output_grid)

            for action in hypothesis:
                input_puzzle.executeAction(action)

            if input_puzzle.__eq__(output_puzzle):
                # Hypothesis is correct, keep it
                continue
            else:
                # Save secondary hypothesis
                secondary_hypothesis = hypothesis

                # Regenerate primary hypothesis
                hypothesis = search(input_grid, output_grid, max_depth=max_search_depth, max_seconds=max_search_seconds)

    # After iterating through the test data, perform final hypotheses on test input
    test_puzzle_1 = ARCPuzzle(test_input)
    test_puzzle_2 = test_puzzle_1.copy()

    if hypothesis is None:
        timed_out = True
    else:
        # Generate primary prediction
        for action in hypothesis:
            try:
                test_puzzle_1.executeAction(action)
            except:
                print(f'Action {action} failed')

        if secondary_hypothesis is not None:
            # Generate secondary prediction
            for action in secondary_hypothesis:
                try:
                    test_puzzle_2.executeAction(action)
                except:
                    print(f'Action {action} failed')

    # Will return initial state if solution wasn't found or search timed out
    primary_prediction = test_puzzle_1.state
    secondary_prediction = test_puzzle_2.state

    return primary_prediction, secondary_prediction, timed_out

# Generating a submission

To generate a submission you need to output a file called `submission.json` that has the following format:

```
{"00576224": [{"attempt_1": [[0, 0], [0, 0]], "attempt_2": [[0, 0], [0, 0]]}],
 "009d5c81": [{"attempt_1": [[0, 0], [0, 0]], "attempt_2": [[0, 0], [0, 0]]}],
 "12997ef3": [{"attempt_1": [[0, 0], [0, 0]], "attempt_2": [[0, 0], [0, 0]]},
              {"attempt_1": [[0, 0], [0, 0]], "attempt_2": [[0, 0], [0, 0]]}],
 ...
}
```

In this case, the task ids come from `test_challenges`. There may be multiple (i.e., >1) test items per task. Therefore, the dictionary has a list of dicts for each task. These submission dictionaries should appear in the same order as the test items from `test_challenges`. Additionally, you can provide two attempts for each test item. In fact, you **MUST** provide two attempts. If you only want to generate a single attempt, then just submit the same answer for both attempts (or submit an empty submission like the ones shown in the example snippit just above.

Here is how we might create a blank submission:

In [None]:
# Create an empty submission dict for output
submission = {}

# Define hyperparameters and search type
timed_out_count = 0
max_search_depth = 3
max_search_seconds = 5

# Options: breadth_first_search, best_first_search, bidirectional_best_first_search
search_type = bidirectional_best_first_search

# Iterate over the test items and build up submission answers
for key, task in tqdm(test_challenges.items()):
    # Use to test a specific task
    # count +=1
    # if count < 150:
    #     continue

    # Here are the task's training inputs and outputs
    train_inputs = [item['input'] for item in task['train']]
    train_outputs = [item['output'] for item in task['train']]
    # print(train_inputs)
    # print(train_outputs)

    # Get test input
    test_input = [item['input'] for item in task['test']]
    # print(test_input)

    # Here we generate outputs for each test item.
    submission[key] = []

    primary_prediction, secondary_prediction, timed_out = make_prediction(train_inputs, train_outputs, test_input, search_type, max_search_depth, max_search_seconds)
    # print(timed_out)
    if timed_out:
        timed_out_count += 1
    # print(prediction)

    # add predictions to submission json
    # NOTE: if a primary or secondary hypothesis wasn't found, predictions will be the initial state of the grid
    # blank_prediction = [[0, 0], [0, 0]]
    submission[key] = [{'attempt_1': primary_prediction, 'attempt_2': secondary_prediction} for item in task['test']]

# Print number of tasks that timed out
print(f'{timed_out_count} tasks timed out')
print(f'max search depth: {max_search_depth}')
print(f'max search seconds: {max_search_seconds}')

# Here we write the submissions to file, so that they will get evaluated
with open('submission.json', 'w') as fp:
    json.dump(submission, fp)

  6%|▋         | 25/400 [02:10<15:23,  2.46s/it]

Action compress failed
Action compress failed
Action compress failed


 10%|█         | 41/400 [03:19<22:17,  3.73s/it]

Action compress failed
Action compress failed


 10%|█         | 42/400 [03:27<30:15,  5.07s/it]

Action compress failed
Action compress failed


 14%|█▍        | 55/400 [04:39<36:00,  6.26s/it]

Action compress failed


 16%|█▋        | 65/400 [05:07<17:37,  3.16s/it]

Action compress failed
Action compress failed


 19%|█▉        | 77/400 [06:05<15:22,  2.85s/it]

Action compress failed
Action compress failed
Action compress failed


 20%|██        | 82/400 [06:21<14:20,  2.71s/it]

Action compress failed
Action compress failed


 22%|██▏       | 88/400 [06:30<09:43,  1.87s/it]

Action compress failed


 22%|██▏       | 89/400 [06:31<08:19,  1.61s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


 22%|██▎       | 90/400 [06:31<06:41,  1.29s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed
Action compress failed


 23%|██▎       | 93/400 [06:33<04:16,  1.20it/s]

Action compress failed


 26%|██▋       | 106/400 [07:34<28:28,  5.81s/it]

Action compress failed
Action compress failed


 27%|██▋       | 108/400 [08:02<48:10,  9.90s/it]

Action compress failed
Action compress failed


 28%|██▊       | 110/400 [08:03<28:10,  5.83s/it]

Action compress failed
Action compress failed
Action compress failed


 28%|██▊       | 111/400 [08:04<22:47,  4.73s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


 30%|███       | 121/400 [08:33<09:27,  2.03s/it]

Action compress failed
Action compress failed


 34%|███▍      | 136/400 [09:11<18:12,  4.14s/it]

Action compress failed
Action compress failed
Action compress failed


 36%|███▌      | 144/400 [09:50<15:57,  3.74s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


 37%|███▋      | 149/400 [09:51<05:01,  1.20s/it]

Action compress failed


 38%|███▊      | 152/400 [09:52<02:49,  1.46it/s]

Action compress failed


 39%|███▉      | 155/400 [10:03<07:28,  1.83s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


 40%|███▉      | 159/400 [10:05<03:47,  1.06it/s]

Action compress failed
Action compress failed


 41%|████      | 164/400 [10:20<06:14,  1.59s/it]

Action compress failed


 44%|████▍     | 177/400 [10:55<08:54,  2.40s/it]

Action compress failed
Action compress failed


 45%|████▍     | 179/400 [10:56<06:05,  1.65s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


 53%|█████▎    | 211/400 [13:00<08:11,  2.60s/it]

Action compress failed
Action compress failed


 56%|█████▌    | 224/400 [13:45<07:07,  2.43s/it]

Action compress failed
Action compress failed


 56%|█████▋    | 225/400 [13:46<05:53,  2.02s/it]

Action compress failed


 60%|█████▉    | 238/400 [14:35<13:54,  5.15s/it]

Action compress failed
Action compress failed


 60%|█████▉    | 239/400 [14:36<10:25,  3.88s/it]

Action compress failed
Action compress failed


 63%|██████▎   | 252/400 [15:12<09:34,  3.88s/it]

Action compress failed


 66%|██████▌   | 264/400 [16:14<10:59,  4.85s/it]

Action compress failed
Action compress failed


 66%|██████▋   | 266/400 [16:18<08:03,  3.61s/it]

Action compress failed
Action compress failed


 68%|██████▊   | 272/400 [17:12<12:39,  5.93s/it]

Action compress failed
Action compress failed
Action compress failed


 69%|██████▉   | 277/400 [18:07<16:47,  8.19s/it]

Action compress failed
Action compress failed


 74%|███████▍  | 297/400 [18:43<06:46,  3.95s/it]

Action compress failed


 76%|███████▋  | 306/400 [19:27<05:14,  3.35s/it]

Action compress failed


 78%|███████▊  | 312/400 [19:39<02:32,  1.74s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed
Action compress failed


 80%|████████  | 321/400 [20:13<03:26,  2.62s/it]

Action compress failed


 82%|████████▏ | 329/400 [20:49<05:33,  4.69s/it]

Action compress failed
Action compress failed


 84%|████████▍ | 336/400 [20:51<01:25,  1.33s/it]

Action compress failed
Action compress failed


 86%|████████▋ | 345/400 [21:42<03:32,  3.86s/it]

Action compress failed
Action compress failed


 88%|████████▊ | 350/400 [21:54<01:41,  2.03s/it]

Action compress failed
Action compress failed
Action compress failed


 89%|████████▉ | 356/400 [22:39<04:00,  5.48s/it]

Action compress failed
Action compress failed


 91%|█████████ | 364/400 [23:12<01:49,  3.04s/it]

Action compress failed


 92%|█████████▏| 369/400 [24:07<04:41,  9.08s/it]

Action compress failed


 94%|█████████▍| 376/400 [24:53<01:36,  4.04s/it]

Action compress failed
Action compress failed


 95%|█████████▍| 379/400 [24:53<00:36,  1.75s/it]

Action compress failed
Action compress failed
Action compress failed


 96%|█████████▌| 382/400 [25:09<01:03,  3.50s/it]

Action compress failed
Action compress failed


 98%|█████████▊| 394/400 [26:16<00:22,  3.73s/it]

Action compress failed
Action compress failed
Action compress failed
Action compress failed


100%|█████████▉| 398/400 [26:24<00:05,  2.57s/it]

Action compress failed


100%|██████████| 400/400 [26:25<00:00,  3.96s/it]


64 tasks timed out
max search depth: 3
max search seconds: 5


Here is what our submission for the test task above looks like:

In [None]:
submission[sample_task]

[{'attempt_1': (), 'attempt_2': ()}]

# Scoring Your Submission

In [None]:
def score_submission():
    if colab:
      sol_path = '/content/drive/MyDrive/Colab Notebooks/Knowledge-Based AI (ARC Prize)/ARC Prize Submission/input/arc-prize-2024/arc-agi_evaluation_solutions.json'
    else:
      sol_path = './input/arc-prize-2024/arc-agi_evaluation_solutions.json'

    with open(sol_path, 'r') as sol_file:
        solutions = json.load(sol_file)

    with open('submission.json', 'r') as sub_file:
        submission = json.load(sub_file)

    overall_score = 0

    for task in solutions:
        score = 0
        for i, answer in enumerate(solutions[task]):
            attempt1_correct = submission[task][i]['attempt_1'] == answer
            attempt2_correct = submission[task][i]['attempt_2'] == answer
            score += int(attempt1_correct or attempt2_correct)

        score /= len(solutions[task])

        overall_score += score


    print(overall_score)

Score the most recently ran submission.

In [None]:
score_submission()

0.0


# Confused about where to get started?

If you're not sure what an initial solution might look like, then consider looking at public notebooks here: https://www.kaggle.com/competitions/arc-prize-2024/code or joining the public discussion here: https://www.kaggle.com/competitions/arc-prize-2024/discussion.

One example notebook that uses a very simple knowledge-based approach is this one: https://www.kaggle.com/code/michaelhodel/program-synthesis-starter-notebook/notebook, which conducts search over a space of domain specific block languages to form hypotheses and then applies these to test items.