In [None]:
# ARGA refactoring project

In [None]:
import json
import time
import numpy as np

from task import Task
from utils import plot_sample, set_types, process_task

In [None]:
train_ch_path = '../input/arc-prize-2024/arc-agi_training_challenges.json'
train_sol_path = '../input/arc-prize-2024/arc-agi_training_solutions.json'
eval_ch_path = '../input/arc-prize-2024/arc-agi_evaluation_challenges.json'
eval_sol_path = '../input/arc-prize-2024/arc-agi_evaluation_solutions.json'
test_path = '../input/arc-prize-2024/arc-agi_test_challenges.json'
sample_path = '../input/arc-prize-2024/sample_submission.json'

with open(train_ch_path, 'r', encoding='utf-8') as file:
    train_tasks = json.load(file)
with open(eval_ch_path, 'r', encoding='utf-8') as file:
    eval_tasks = json.load(file)
with open(test_path, 'r', encoding='utf-8') as file:
    test_tasks = json.load(file)

with open(train_sol_path, 'r', encoding='utf-8') as file:
    truth_t = json.load(file)
with open(eval_sol_path, 'r', encoding='utf-8') as file:
    truth_e = json.load(file)
truth = truth_t | truth_e

print(len(train_tasks), len(eval_tasks), len(test_tasks), len(truth))

In [None]:
def solve_task(name, task, limit=10, stats=None):
    task = Task(name, task)
    return task.solve(time_limit=limit, stats=stats)

def solve_all(tasks, limit=10):
    start_time = time.time()
    predictions = {}
    stats = {}
    index = 0
    count = 0
    for name, task in tasks.items():
        index += 1
        if task['type'] == 1:
            duration = time.time() - start_time
            hh = int(duration / 3600)
            mm = int(duration % 3600 / 60)
            ss = int(duration % 60)
            print(f'{int(100*index/len(tasks))}% | {index}/{len(tasks)} [{hh:02}:{mm:02}:{ss:02}]')
            count += 1
            stats[name] = f'{count:3} {index:3} {name}'
            process_task(task)
            prediction = solve_task(name, task, limit, stats)
        else:
            prediction = [[[0, 0], [0, 0]]] * len(task['test'])
        predictions[name] = prediction
    return predictions, stats

def run_main(tasks, limit=10):
    set_types(tasks)
    predictions, stats = solve_all(tasks, limit)
    submission = {}
    for name, task in tasks.items():
        submission[name] = []
        for i in range(len(task['test'])):
            submission[name].append(
                {'attempt_1': predictions[name][i],
                 'attempt_2': [[0, 0], [0, 0]]})

    with open('submission.json', 'w', encoding='utf-8') as sub_file:
        json.dump(submission, sub_file)
    return stats

def observe_results(tasks, stats=None):
    with open('submission.json', 'r', encoding='utf-8') as file:
        submission = json.load(file)

    predictions = {}
    for name, attempts in submission.items():
        predictions[name] = []
        for i in range(len(attempts)):
            predictions[name].append(attempts[i]['attempt_1'])

    scores = []
    for name, prediction in predictions.items():
        score = [0 if np.shape(true_grid) != np.shape(grid) else int(
            np.equal(true_grid, grid).all()) for true_grid, grid in zip(truth[name], prediction)]
        scores.append(score)

    solved = [any(score) for score in scores]
    percent_solved = 100 * sum(solved) / len(solved)
    print(f'Solved: {sum(solved)} from {len(solved)} ({percent_solved:g}%)', end='\n\n')

    if stats is not None:
        summary = {'tn': 0, 'fn': 0, 'fp': 0, 'tp': 0}
        print('  # IDX  ID/Name   Time Train  Test')
        for entry in stats.values():
            print(entry)
            if entry.find('False False') > 0:
                summary['tn'] += 1
            if entry.find('False True') > 0:
                summary['fn'] += 1
            if entry.find('True  False') > 0:
                summary['fp'] += 1
            if entry.find('True  True') > 0:
                summary['tp'] += 1
        print('', summary, '', sep='\n')

    count = 0
    index = 0
    for name, prediction, right in zip(predictions.keys(), predictions.values(), solved):
        task = tasks[name]
        index += 1
        if prediction != [[[0, 0], [0, 0]]] * len(task['test']):
            count += 1
            print(count, index, name, right)
            for i in range(len(task['test'])):
                plot_sample(task['test'][i]['input'], truth[name][i], prediction[i])

In [None]:
# Task control
the_stats = None
the_tasks = test_tasks
time_limit = 10

In [None]:
the_stats = run_main(the_tasks, time_limit)

In [None]:
if '007bbfb7' in test_tasks.keys():
    observe_results(the_tasks, the_stats)