In [1]:
"""https://iliao2345.github.io/blog_posts/arc_agi_without_pretraining/arc_agi_without_pretraining.html"""

'https://iliao2345.github.io/blog_posts/arc_agi_without_pretraining/arc_agi_without_pretraining.html'

In [2]:
import argparse, json, os, re
import numpy as np
import pandas as pd
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any, Union
from PIL import Image, ImageDraw, ImageFont
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass
from enum import Enum
import warnings

import time, multiprocessing
import multiT_sys
import layers
import init
warnings.filterwarnings("ignore")



In [3]:
class ARCVisualizer:
    """Enhanced ARC task vis w/ mult display modes

    provides utilities for conv int grids to RGB images, w/ optionality
    to add grid liones/titles and assmbling comp. vis of trainig and test pairs """

    PALETTE = np.array([
        [0,   0,   0],      #0 black
        [0,   116, 217],    #1 blue
        [255, 65,  54],     #2 red
        [46,  204, 64],     #3 green
        [255, 220, 0],      #4 yellow
        [128, 128, 128],    #5 gray
        [240, 18,  190],    #6 magenta/pink
        [255, 133, 27],     #7 orange
        [0,   255, 255],    #8 cyan/sky
        [135, 12,  37],     #9 maroon/brown
    ], dtype=np.uint8)


    def __init__(self, scale: int = 30, draw_grid: bool = True):
        self.scale = scale
        self.draw_grid = draw_grid
        self.cmap = colors.ListedColormap([
            "#000000", "#0074D9", "#FF4136", "#2ECC40", "#FFDC00",
            "#AAAAAA", "#F012BE", "#FF851B", "#7FDBFF", "#870C25"])
        self.norm = colors.Normalize(vmin=0, vmax=9)
    def grid_to_image(self, grid: Union[List[List[int]], np.ndarray],
                      title: str = None) -> Image.Image:
        """conv grid to PIL image w/ optional title

        input grid must be 2d-list or np array w/ 0-9 ints inclusive

        returns nearest-neighbor resize to enlarge each cell by config scale"""
        arr = np.array(grid, dtype=np.int16)
        if arr.ndim != 2:
            raise ValueError("Grid must be 2D")
        if arr.min() < 0 or arr.max() > 9:
            raise ValueError("Grid values must be in 0-9 inclusive")
        rgb = self.PALETTE[arr]
        img = Image.fromarray(rgb.astype(np.uint8), mode="RGB")
        if self.scale != 1:
            img = img.resize((img.width * self.scale, img.height * self.scale),
                             resample=Image.NEAREST)
        if self.draw_grid and self.scale >= 10:
            self._add_gridlines(img)
        if title:
            img = self._add_title_banner(img, title)
        return img
    def _add_gridlines(self, img: Image.Image):
        """draw grid lines on image"""
        draw = ImageDraw.Draw(img)
        grid_color = (40, 40, 40)
        for x in range(0, img.width + 1, self.scale):
            draw.line([(x, 0), (x, img.height)], fill=grid_color)
        for y in range(0, img.height + 1, self.scale):
            draw.line([(0, y), (img.width, y)], fill=grid_color)

    def _add_title_banner(self, img: Image.Image, title: str) -> Image.Image:
        """prepend small banner w/ title above image"""
        banner_height = 30
        padding = 5
        new_img = Image.new("RGB", (img.width, banner_height + padding + img.height),
                            (255, 255, 255))
        draw = ImageDraw.Draw(new_img)
        draw.rectangle([0, 0, img.width, banner_height], fill=(245, 245, 245))
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14)
        except Exception:
            font = ImageFont.load_default()
        draw.text((10, 8), title, fill=(30, 30, 30), font=font)
        new_img.paste(img, (0, banner_height + padding))
        return new_img

    def visualize_task_matplotlib(self, task: Dict, task_id: str = None, 
                                  show_predictions: List = None):
        """vis full task using matplotlib

        training and test pairs arr in grid of subplots.  
        if predictions shown next to corresponding test inputs if provided
          Unused axes are hidden +  returned figure can be
        saved or further customized by caller
        """
        n_train = len(task["train"])
        n_test = len(task["test"])
        n_rows = max(2, n_test + (1 if show_predictions else 0))
        n_cols = max(n_train, 2)
        fig, axes = plt.subplots(n_rows, n_cols * 2, figsize=(n_cols * 4, n_rows * 2))
        if n_rows == 1:
            axes = axes.reshape(1, -1)
        #plot training examples
        for i, example in enumerate(task["train"]):
            ax_in = axes[0, i * 2]
            ax_out = axes[0, i * 2 + 1]
            ax_in.imshow(example["input"], cmap=self.cmap, norm=self.norm)
            ax_in.set_title(f"Train {i+1} Input")
            ax_in.axis("off")
            ax_in.grid(True, which="both", color="lightgrey", linewidth=0.5)
            ax_out.imshow(example["output"], cmap=self.cmap, norm=self.norm)
            ax_out.set_title(f"Train {i+1} Output")
            ax_out.axis("off")
            ax_out.grid(True, which="both", color="lightgrey", linewidth=0.5)
        #plot test inputs
        for i, test in enumerate(task["test"]):
            ax = axes[1, i * 2]
            ax.imshow(test["input"], cmap=self.cmap, norm=self.norm)
            ax.set_title(f"Test {i+1} Input")
            ax.axis("off")
            ax.grid(True, which="both", color="lightgrey", linewidth=0.5)
        #plot predictions if provided
        if show_predictions:
            for i, pred in enumerate(show_predictions):
                if i < n_test:
                    ax = axes[1, i * 2 + 1]
                    ax.imshow(pred, cmap=self.cmap, norm=self.norm)
                    ax.set_title(f"Test {i+1} Prediction")
                    ax.axis("off")
                    ax.grid(True, which="both", color="lightgrey", linewidth=0.5)
        #hide unused subplots
        for i in range(n_rows):
            for j in range(n_cols * 2):
                idx = i * n_cols * 2 + j
                if idx >= len(axes.flat):
                    continue
                if i == 0 and j >= n_train * 2:
                    axes[i, j].axis("off")
                elif i == 1 and j >= n_test * 2:
                    axes[i, j].axis("off")
        if task_id:
            fig.suptitle(f"Task: {task_id}", fontsize=16)
        plt.tight_layout()
        return fig

    def create_composite_image(self, task: Dict, task_id: str = None,
                               solutions: List = None) -> Image.Image:
        """generate composite image for all training and test pairs.

        Training examples are displayed as input/output pairs
        test inputs are optionally paired w/ provided solutions
        Images stacked vert to produce single overview of entire task.
        """
        images = []
        #Training pairs
        for i, example in enumerate(task["train"]):
            in_img = self.grid_to_image(example["input"], f"Train {i+1} Input")
            out_img = self.grid_to_image(example["output"], f"Train {i+1} Output")
            pair = self._hstack([in_img, out_img], gap=10)
            images.append(pair)
        #Test inputs and solutions
        for i, test in enumerate(task["test"]):
            test_img = self.grid_to_image(test["input"], f"Test {i+1} Input")
            if solutions and i < len(solutions):
                sol_img = self.grid_to_image(solutions[i], f"Test {i+1} Solution")
                pair = self._hstack([test_img, sol_img], gap=10)
                images.append(pair)
            else:
                images.append(test_img)
        composite = self._vstack(images, gap=15)
        if task_id:
            composite = self._add_title_banner(composite, f"Task: {task_id}")
        return composite

    def _hstack(self, images: List[Image.Image], gap: int = 10) -> Image.Image:
        """horz stack list of images w/ optional gaps"""
        if not images:
            raise ValueError("No images to stack")
        height = max(img.height for img in images)
        width = sum(img.width for img in images) + gap * (len(images) - 1)
        result = Image.new("RGB", (width, height), (255, 255, 255))
        x = 0
        for img in images:
            result.paste(img, (x, 0))
            x += img.width + gap
        return result

    def _vstack(self, images: List[Image.Image], gap: int = 10) -> Image.Image:
        """vert stack list of images w/ optional gaps"""
        if not images:
            raise ValueError("No images to stack")
        width = max(img.width for img in images)
        height = sum(img.height for img in images) + gap * (len(images) - 1)
        result = Image.new("RGB", (width, height), (255, 255, 255))
        y = 0
        for img in images:
            x = (width - img.width) // 2  #center horizontally
            result.paste(img, (x, y))
            y += img.height + gap
        return result

In [4]:
class DSLOperations:
    """coll of pure funcs to manip int grids
    
    ops form building blocks of symbolic search + incl basic geo transforms(rot, flip trans), color repl, cropping, padding, flood filling and symm checks"""
    
    @staticmethod
    def rotate(grid: np.ndarray, k: int = 1) -> np.ndarray:
        """rotate grid by 90 deg * k steps using np.rot90"""
        return np.rot90(grid, k)

    @staticmethod
    def flip(grid: np.ndarray, axis: int = 0) -> np.ndarray:
        """flip grid along vert (axis=0) or horz (axis=1) axis"""
        return np.flip(grid, axis=axis)

    @staticmethod
    def transpose(grid: np.ndarray) -> np.ndarray:
        """swap x and y axes of grid"""
        return np.swapaxes(grid, 0, 1)

    @staticmethod
    def color_replace(grid: np.ndarray, from_color: int, to_color: int) -> np.ndarray:
        """repl all occ of from_color w/ to_color"""
        result = grid.copy()
        result[result == from_color] = to_color
        return result

    @staticmethod
    def crop(grid: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> np.ndarray:
        """extr rectangular subgrid defined by (x1,y1) to (x2,y2) inclusive"""
        return grid[y1:y2+1, x1:x2+1]

    @staticmethod
    def pad(grid: np.ndarray, top: int = 0, bottom: int = 0,
            left: int = 0, right: int = 0, fill: int = 0) -> np.ndarray:
        """pad grid on all sides w/ specified count of rows/columns"""
        return np.pad(grid, ((top, bottom), (left, right)), constant_values=fill)

    @staticmethod
    def flood_fill(grid: np.ndarray, x: int, y: int, new_color: int) -> np.ndarray:
        """perf flood fill starting at (x,y) using new color"""
        grid = grid.copy()
        target_color = grid[y, x]
        if target_color == new_color:
            return grid
        h, w = grid.shape
        stack = [(x, y)]
        while stack:
            cx, cy = stack.pop()
            if cx < 0 or cy < 0 or cx >= w or cy >= h:
                continue
            if grid[cy, cx] != target_color:
                continue
            grid[cy, cx] = new_color
            stack.extend([(cx+1, cy), (cx-1, cy), (cx, cy+1), (cx, cy-1)])
        return grid

    @staticmethod
    def bounding_box(grid: np.ndarray, color: int) -> Optional[Tuple[int, int, int, int]]:
        """ret bounding box of all cells matching given color"""
        yx = np.argwhere(grid == color)
        if len(yx) == 0:
            return None
        ys, xs = yx[:, 0], yx[:, 1]
        return np.min(xs), np.min(ys), np.max(xs), np.max(ys)

    @staticmethod
    def extract_object(grid: np.ndarray, color: int,
                       background: int = 0) -> Optional[np.ndarray]:
        """extract smallest subgrid containing specified color"""
        bbox = DSLOperations.bounding_box(grid, color)
        if bbox is None:
            return None
        x1, y1, x2, y2 = bbox
        return grid[y1:y2+1, x1:x2+1]

    @staticmethod
    def count_colors(grid: np.ndarray) -> Dict[int, int]:
        """count occurrences of each color in grid"""
        unique, counts = np.unique(grid, return_counts=True)
        return dict(zip(unique, counts))

    @staticmethod
    def get_symmetry(grid: np.ndarray) -> Dict[str, bool]:
        """return dict indicating presence of various symmetries"""
        return {
            "horizontal": np.array_equal(grid, np.flipud(grid)),
            "vertical": np.array_equal(grid, np.fliplr(grid)),
            "diagonal": np.array_equal(grid, grid.T),
            "rotational_90": np.array_equal(grid, np.rot90(grid, 2))
        }




In [5]:
class DataAugmentation:
    """generate simple augmented versions of given ARC task.

    Augmentations include transpose, rotations and random color permutations
    variants can expose novel patterns to symbolic search or
    neural models and potentially improve generalization
    """

    @staticmethod
    def augment_task(task: Dict, include_transpose: bool = True,
                    include_rotation: bool = True,
                    include_color_permutation: bool = True) -> List[Dict]:
        """ret list containing original task and its augmentations"""
        augmented = [task]
        if include_transpose:
            augmented.append(DataAugmentation._transpose_task(task))
        if include_rotation:
            for k in [1, 2, 3]:
                augmented.append(DataAugmentation._rotate_task(task, k))
        if include_color_permutation:
            perm = DataAugmentation._generate_color_permutation()
            augmented.append(DataAugmentation._permute_colors_task(task, perm))
        return augmented

    @staticmethod
    def _transpose_task(task: Dict) -> Dict:
        """ret new task w/ all grids transposed"""
        new_task = {'train': [], 'test': []}
        for example in task['train']:
            new_task['train'].append({
                'input': np.transpose(example['input']).tolist(),
                'output': np.transpose(example['output']).tolist()
            })
        for test in task['test']:
            new_task['test'].append({
                'input': np.transpose(test['input']).tolist()
            })
        return new_task

    @staticmethod
    def _rotate_task(task: Dict, k: int) -> Dict:
        """ret new task w/ all grids rotated by 90 deg * k"""
        new_task = {'train': [], 'test': []}
        for example in task['train']:
            new_task['train'].append({
                'input': np.rot90(example['input'], k).tolist(),
                'output': np.rot90(example['output'], k).tolist()
            })
        for test in task['test']:
            new_task['test'].append({
                'input': np.rot90(test['input'], k).tolist()
            })
        return new_task

    @staticmethod
    def _generate_color_permutation() -> np.ndarray:
        """Generate random permutation of colors, preserving background"""
        perm = np.arange(10)
        perm[1:] = np.random.permutation(9) + 1
        return perm

    @staticmethod
    def _permute_colors_task(task: Dict, permutation: np.ndarray) -> Dict:
        """Apply color permutation to all grids in task"""
        new_task = {'train': [], 'test': []}
        for example in task['train']:
            new_task['train'].append({
                'input': permutation[np.array(example['input'])].tolist(),
                'output': permutation[np.array(example['output'])].tolist()
            })
        for test in task['test']:
            new_task['test'].append({
                'input': permutation[np.array(test['input'])].tolist()
            })
        return new_task



In [6]:
class SymbolicProgramSearch:
    """search space of small prog composed of DSL ops

    DFS used to explore comp of primitive ops - when prog transforms all training
    inputs to corr. outputs, deemed VALID solution - max prog depth can be config at instantiation"""
    def __init__(self, max_depth: int = 3):
        self.max_depth = max_depth
        self.dsl = DSLOperations()
        #define candidate ops as (name, function) pairs
        self.operations = [
            ("rotate_90", lambda g: self.dsl.rotate(g, 1)),
            ("rotate_180", lambda g: self.dsl.rotate(g, 2)),
            ("rotate_270", lambda g: self.dsl.rotate(g, 3)),
            ("flip_h", lambda g: self.dsl.flip(g, 1)),
            ("flip_v", lambda g: self.dsl.flip(g, 0)),
            ("transpose", lambda g: self.dsl.transpose(g)),
        ]
        #include simple color replacements for first few colors
        for i in range(1, 4):
            for j in range(1, 4):
                if i != j:
                    self.operations.append(
                        (f"color_{i}_to_{j}", lambda g, fi=i, fj=j: self.dsl.color_replace(g, fi, fj))
                    )


    def search(self, task: Dict) -> Optional[List[Tuple[str, Any]]]:
        """attempt to discover program that solves training examples"""
        train_examples = task["train"]
        return self._dfs_search(train_examples, [])

    def _dfs_search(self, examples: List[Dict],
                    current_program: List[Tuple[str, Any]]) -> Optional[List[Tuple[str, Any]]]:
        """DFS for program that matches all training pairs"""
        if len(current_program) > self.max_depth:
            return None
        #check current program
        if current_program and self._program_matches(examples, current_program):
            return current_program
        #explore further operations
        for op_name, op_func in self.operations:
            new_program = current_program + [(op_name, op_func)]
            result = self._dfs_search(examples, new_program)
            if result is not None:
                return result
        return None

    def _program_matches(self, examples: List[Dict],
                         program: List[Tuple[str, Any]]) -> bool:
        """check whether candidate prog produces all expected outputs"""
        for example in examples:
            input_grid = np.array(example["input"])
            expected_output = np.array(example["output"])
            result = self._apply_program(input_grid, program)
            if result is None or not np.array_equal(result, expected_output):
                return False
        return True

    def _apply_program(self, grid: np.ndarray,
                       program: List[Tuple[str, Any]]) -> Optional[np.ndarray]:
        """apply seq of ops to grid"""
        try:
            result = grid.copy()
            for _, op_func in program:
                result = op_func(result)
            return result
        except Exception:
            return None

    def apply(self, grid: np.ndarray,
              program: List[Tuple[str, Any]]) -> Optional[np.ndarray]:
        """apply discovered program to new grid"""
        return self._apply_program(grid, program)

In [7]:
class SimpleARCNet(nn.Module):
    """light cnn for recog grid patterns

    net takes one-hot input and output grids stacked along channel dim and predicts one of
    transform labels - NOT TRAINED YET - simple building base"""
    def __init__(self, max_grid_size: int = 30, hidden_dim: int = 256):
        super().__init__()
        self.max_grid_size = max_grid_size
        #encoder: two conv layers w/ pooling and adaptive pooling to fixed size
        self.encoder = nn.Sequential(
            nn.Conv2d(20, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(4)
        )
        #fully connected classifier
        self.fc = nn.Sequential(
            nn.Linear(128 * 4 * 4, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, 10)
        )

    def forward(self, input_grid: torch.Tensor, output_grid: torch.Tensor) -> torch.Tensor:
        """Forward pass: classify relation between input and output grid"""
        x = torch.cat([input_grid, output_grid], dim=1)
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    @staticmethod
    def grid_to_tensor(grid: np.ndarray, num_colors: int = 10,
                       max_size: int = 30) -> torch.Tensor:
        """conv an integer grid to one‑hot tensor w/ padding"""
        h, w = grid.shape
        padded = np.zeros((max_size, max_size), dtype=int)
        padded[:h, :w] = grid
        one_hot = np.zeros((num_colors, max_size, max_size), dtype=np.float32)
        for i in range(num_colors):
            one_hot[i] = (padded == i).astype(np.float32)
        return torch.tensor(one_hot)

In [8]:

class ARCCompressor:
    """
    main model class for VAE Decoder in ARC soln
    """

    #def channel dimensions that all layers use
    n_layers = 4
    share_up_dim = 16
    share_down_dim = 8
    decoding_dim = 4
    softmax_dim = 2
    cummax_dim = 4
    shift_dim = 4
    nonlinear_dim = 16

    # This function gives channel dimension of residual stream depending on
    # which dimensions are present, for every tensor in multitensor.
    def channel_dim_fn(self, dims):
        return 16 if dims[2] == 0 else 8

    def __init__(self, task):
        """
        Create model that is tailored to given task, and initialize all weights.
        weights are symmetrized such that swapping x and y dimension ordering should
        make output's dimension ordering also swapped, for same weights. This may not
        be exactly correct since symmetrizing all operations is difficult.
        Args:
            task (preprocessing.Task): task which model is to be made for solving.
        """
        self.multitensor_system = task.multitensor_system

        # Initialize weights
        initializer = init.Initializer(self.multitensor_system, self.channel_dim_fn)

        self.multiposteriors = initializer.initialize_multiposterior(self.decoding_dim)
        self.decode_weights = initializer.initialize_multilinear([self.decoding_dim, self.channel_dim_fn])
        initializer.symmetrize_xy(self.decode_weights)
        self.target_capacities = initializer.initialize_multizeros([self.decoding_dim])

        self.share_up_weights = []
        self.share_down_weights = []
        self.softmax_weights = []
        self.cummax_weights = []
        self.shift_weights = []
        self.direction_share_weights = []
        self.nonlinear_weights = []

        for layer_num in range(self.n_layers):
            self.share_up_weights.append(initializer.initialize_multiresidual(self.share_up_dim, self.share_up_dim))
            self.share_down_weights.append(initializer.initialize_multiresidual(self.share_down_dim, self.share_down_dim))
            output_scaling_fn = lambda dims: self.softmax_dim * (2 ** (dims[1] + dims[2] + dims[3] + dims[4]) - 1)
            self.softmax_weights.append(initializer.initialize_multiresidual(self.softmax_dim, output_scaling_fn))
            self.cummax_weights.append(initializer.initialize_multiresidual(self.cummax_dim, self.cummax_dim))
            self.shift_weights.append(initializer.initialize_multiresidual(self.shift_dim, self.shift_dim))
            self.direction_share_weights.append(initializer.initialize_multidirection_share())
            self.nonlinear_weights.append(initializer.initialize_multiresidual(self.nonlinear_dim, self.nonlinear_dim))

        self.head_weights = initializer.initialize_head()
        self.mask_weights = initializer.initialize_linear(
            [1, 0, 0, 1, 0], [self.channel_dim_fn([1, 0, 0, 1, 0]), 2]
        )

        # Symmetrize weights so that their behavior is equivariant to swapping x and y dimension ordering
        for weight_list in [
            self.share_up_weights,
            self.share_down_weights,
            self.softmax_weights,
            self.cummax_weights,
            self.shift_weights,
            self.nonlinear_weights,
        ]:
            for layer_num in range(self.n_layers):
                initializer.symmetrize_xy(weight_list[layer_num])

        for layer_num in range(self.n_layers):
            initializer.symmetrize_direction_sharing(self.direction_share_weights[layer_num])

        self.weights_list = initializer.weights_list


    def forward(self):
        """
        Compute forward pass of VAE decoder. Start w/ internally stored latents,
        and process => Output an [example, color, x, y, channel] tensor for colors,
        and  [example, x, channel] and [example, y, channel] tensor for masks.
        Returns:
            Tensor: An [example, color, x, y, channel] tensor, where for every example,
                    input/output (picked by channel dimension), and every pixel (picked
                    by x and y dimensions), we have vector full of logits for that
                    pixel being each possible color.
            Tensor: An [example, x, channel] tensor, where for every example, input/output
                    (picked by channel dimension), and every x, we assign score that
                    contributes to likelihood that that index of x dimension is not
                    masked out in prediction.
            Tensor: An [example, y, channel] tensor, used in same way as above.
            list[Tensor]: list of tensors indicating amount of KL contributed by each component
                    tensor in layers.decode_latents() step.
            list[str]: list of tensor names that correspond to each tensor in aforementioned output.
        """
        #decoding layer
        x, KL_amounts, KL_names = layers.decode_latents(
            self.target_capacities, self.decode_weights, self.multiposteriors
        )

        for layer_num in range(self.n_layers):
            #multitensor communication layer
            x = layers.share_up(x, self.share_up_weights[layer_num])

            #softmax layer
            x = layers.softmax(x, self.softmax_weights[layer_num], pre_norm=True, post_norm=False, use_bias=False)

            #directional layers
            x = layers.cummax(
                x, self.cummax_weights[layer_num], self.multitensor_system.task.masks,
                pre_norm=False, post_norm=True, use_bias=False
            )
            x = layers.shift(
                x, self.shift_weights[layer_num], self.multitensor_system.task.masks,
                pre_norm=False, post_norm=True, use_bias=False
            )

            #directional communication layer
            x = layers.direction_share(x, self.direction_share_weights[layer_num], pre_norm=True, use_bias=False)

            #nonlinear layer
            x = layers.nonlinear(x, self.nonlinear_weights[layer_num], pre_norm=True, post_norm=False, use_bias=False)

            #nultitensor communication layer
            x = layers.share_down(x, self.share_down_weights[layer_num])

            #normalization layer
            x = layers.normalize(x)

        #linear Heads
        output = (
            layers.affine(x[[1, 1, 0, 1, 1]], self.head_weights, use_bias=False)
            + 100 * self.head_weights[1]
        )
        x_mask = layers.affine(x[[1, 0, 0, 1, 0]], self.mask_weights, use_bias=True)
        y_mask = layers.affine(x[[1, 0, 0, 0, 1]], self.mask_weights, use_bias=True)

        #postprocessing
        x_mask, y_mask = layers.postprocess_mask(self.multitensor_system.task, x_mask, y_mask)

        return output, x_mask, y_mask, KL_amounts, KL_names


In [9]:
class ParallelTaskScheduler:
    """coord exec of mult tasks across CPUs GPUs

    scheduler assigns tasks to GPUs by avail memory quotas,
    spawns worker process solvers per task, and aggregates memory usage
    and solution results
    impl greedy scheduling algo similar
    """


    def __init__(self, time_budget_hours: float = 12.0):
        self.n_cpus = multiprocessing.cpu_count()
        self.n_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 1
        self.time_budget = time_budget_hours * 3600
        self.start_time = time.time()
        self.end_time = self.start_time + self.time_budget - 300  # reserve small buffer
        self.gpu_memory_quotas = self._get_gpu_memory_quotas()
        self.task_memory_usage = {}
        self.task_time_estimates = {}

    def _get_gpu_memory_quotas(self) -> List[float]:
        """ret list of available memory (in GB) for each GPU"""
        quotas = []
        if torch.cuda.is_available():
            for i in range(self.n_gpus):
                mem_info = torch.cuda.mem_get_info(i)
                available_gb = mem_info[0] / (1024**3)
                quotas.append(available_gb)
        else:
            quotas = [8.0]
        return quotas

    def parallelize_runs(self, tasks: Dict, n_iterations: int,
                          verbose: bool = False) -> Tuple[Dict, Dict, float]:
        """run solver processes for all tasks in parallel given resource limits"""
        n_tasks = len(tasks)
        task_names = list(tasks.keys())
        gpu_quotas = self.gpu_memory_quotas.copy()
        tasks_started = [False] * n_tasks
        tasks_finished = [False] * n_tasks
        processes = [None] * n_tasks
        process_gpu_ids = [None] * n_tasks
        with multiprocessing.Manager() as manager:
            memory_dict = manager.dict()
            solutions_dict = manager.dict()
            error_queue = manager.Queue()
            t_start = time.time()
            while not all(tasks_finished):
                if not error_queue.empty():
                    error = error_queue.get()
                    if verbose:
                        print(f"Error occurred: {error}")
                    raise ValueError(error)
                #check finished processes
                for i in range(n_tasks):
                    if tasks_started[i] and not tasks_finished[i]:
                        if processes[i] is not None:
                            processes[i].join(timeout=0)
                            if not processes[i].is_alive():
                                tasks_finished[i] = True
                                if process_gpu_ids[i] is not None:
                                    task_usage = self.task_memory_usage.get(task_names[i], 1.0)
                                    gpu_quotas[process_gpu_ids[i]] += task_usage
                                    if verbose:
                                        print(f"{task_names[i]} finished on GPU {process_gpu_ids[i]}")
                #schedule new tasks
                for gpu_id in range(self.n_gpus):
                    for i in range(n_tasks):
                        if tasks_started[i]:
                            continue
                        task_name = task_names[i]
                        task_usage = self.task_memory_usage.get(task_name, 1.0)
                        enough_quota = gpu_quotas[gpu_id] >= task_usage
                        active_processes = sum(tasks_started) - sum(tasks_finished)
                        enough_cpus = active_processes < self.n_cpus
                        if enough_quota and enough_cpus:
                            gpu_quotas[gpu_id] -= task_usage
                            args = (
                                task_name,
                                tasks[task_name],
                                n_iterations,
                                gpu_id,
                                memory_dict,
                                solutions_dict,
                                error_queue
                            )
                            p = multiprocessing.Process(target=self._solve_task_process, args=args)
                            p.start()
                            processes[i] = p
                            tasks_started[i] = True
                            process_gpu_ids[i] = gpu_id
                            if verbose:
                                print(f"{task_name} started on GPU {gpu_id}")
                time.sleep(0.1)
            #convert manager dicts
            memory_dict = dict(memory_dict)
            solutions_dict = dict(solutions_dict)
            time_taken = time.time() - t_start
            if verbose:
                print(f"All tasks finished in {time_taken:.2f} seconds")
            return memory_dict, solutions_dict, time_taken

    def _solve_task_process(self, task_name: str, task_data: Dict,
                             n_iterations: int, gpu_id: int,
                             memory_dict: Dict, solutions_dict: Dict,
                             error_queue: multiprocessing.Queue):
        """worker process entry point used by parallelize_runs"""
        try:
            if torch.cuda.is_available():
                torch.cuda.set_device(gpu_id)
            solver = ARCCompressor(device=f"cuda:{gpu_id}")
            memory_used = solver.estimate_memory_usage(task_name, n_iterations)
            memory_dict[task_name] = memory_used
            solutions = solver.solve_with_compression(task_name, task_data, n_iterations, gpu_id)
            solutions_dict[task_name] = solutions
        except Exception as e:
            error_queue.put(f"Error in {task_name}: {str(e)}")

    def profile_tasks(self, tasks: Dict, profile_steps: int = 2) -> Dict:
        """run short profiling pass to est memory usage per task"""
        memory_dict, _, _ = self.parallelize_runs(tasks, n_iterations=profile_steps, verbose=False)
        self.task_memory_usage = memory_dict
        sorted_tasks = sorted(memory_dict.items(), key=lambda x: x[1], reverse=True)
        return dict(sorted_tasks)

    def estimate_time_per_step(self, tasks: Dict, test_steps: int = 20) -> float:
        """est time per iteration by running fixed number of steps"""
        safe_quotas = [q - 6.0 for q in self.gpu_memory_quotas]
        original_quotas = self.gpu_memory_quotas.copy()
        self.gpu_memory_quotas = safe_quotas
        _, _, time_taken = self.parallelize_runs(tasks, n_iterations=test_steps, verbose=False)
        self.gpu_memory_quotas = original_quotas
        return time_taken / test_steps

    def calculate_optimal_steps(self, time_per_step: float) -> int:
        """comp how many iter fit in remaining time budget"""
        time_left = self.end_time - time.time()
        n_steps = int(time_left // time_per_step)
        return n_steps

In [10]:
#main solver class combines all appr
class EnhancedARCSolver:

    def __init__(self, use_augmentation: bool = True,
                 use_symbolic_search: bool = True,
                 use_neural: bool = False,
                 use_compressarc: bool = False,
                 use_parallel: bool = False,
                 time_budget_hours: float = 12.0):
        self.visualizer = ARCVisualizer()
        self.dsl = DSLOperations()
        self.augmenter = DataAugmentation()
        self.symbolic_searcher = SymbolicProgramSearch(max_depth=3)
        self.use_augmentation = use_augmentation
        self.use_symbolic_search = use_symbolic_search
        self.use_neural = use_neural
        self.use_compressarc = use_compressarc
        self.use_parallel = use_parallel
        if self.use_neural:
            self.neural_model = SimpleARCNet()
        if self.use_compressarc:
            self.compress_solver = ARCCompressor()
        if self.use_parallel:
            self.scheduler = ParallelTaskScheduler(time_budget_hours)

    def solve_task(self, task: Dict, task_id: str = None) -> Dict:
        """attempt to solve single ARC task using enabled strats"""
        solutions = []
        method_used = None
        #strat 1: CompressARC
        if self.use_compressarc and not solutions:
            try:
                solutions = self.compress_solver.solve_with_compression(
                    task_id or "unknown", task, n_iterations=100, gpu_id=0)
                if solutions and len(solutions) == len(task["test"]):
                    method_used = "compressarc"
                    print(f"Solved {task_id} using CompressARC method")
            except Exception as e:
                print(f"CompressARC failed for {task_id}: {e}")
                solutions = []
        #strat 2: symbolic search
        if self.use_symbolic_search and not solutions:
            program = self.symbolic_searcher.search(task)
            if program:
                print(f"Found symbolic program for {task_id}: {[p[0] for p in program]}")
                for test in task["test"]:
                    result = self.symbolic_searcher.apply(np.array(test["input"]), program)
                    if result is not None:
                        solutions.append(result.tolist())
                if len(solutions) == len(task["test"]):
                    method_used = "symbolic"
        #strat 3: augmentation + search
        if self.use_augmentation and not solutions:
            augmented_tasks = self.augmenter.augment_task(task)
            for aug_task in augmented_tasks:
                program = self.symbolic_searcher.search(aug_task)
                if program:
                    print(f"Found program w/ augmentation for {task_id}")
                    for test in task["test"]:
                        result = np.array(test["input"])
                        solutions.append(result.tolist())
                    method_used = "augmented"
                    break
        #strat 4: fallback copy
        if not solutions:
            print(f"Using fallback for {task_id}")
            for test in task["test"]:
                solutions.append(test["input"])
            method_used = "fallback"
        return {"predictions": solutions, "method": method_used}

    def visualize_solution(self, task: Dict, solutions: List,
                           task_id: str = None, save_path: str = None):
        """displ task alongside its soln using matplotlib"""
        fig = self.visualizer.visualize_task_matplotlib(task, task_id, show_predictions=solutions)
        if save_path:
            fig.savefig(save_path, dpi=100, bbox_inches="tight")
        return fig

    def export_solution_images(self, task: Dict, solutions: List,
                               output_dir: str, task_id: str):
        """write composite and indiv solution images to disk"""
        output_path = Path(output_dir) / task_id
        output_path.mkdir(parents=True, exist_ok=True)
        composite = self.visualizer.create_composite_image(task, task_id, solutions)
        composite.save(output_path / f"{task_id}_composite.png")
        for i, solution in enumerate(solutions):
            img = self.visualizer.grid_to_image(solution, f"Test {i+1} Solution")
            img.save(output_path / f"{task_id}_test{i+1}_solution.png")

    def process_dataset(self, challenges_path: str,
                        output_dir: str = "./arc_output",
                        save_visualizations: bool = True,
                        optimize_for_competition: bool = False) -> Dict:
        """Solve all tasks in specified JSON file and save results"""
        with open(challenges_path, "r") as f:
            challenges = json.load(f)
        all_solutions = {}
        if self.use_parallel and optimize_for_competition:
            print("=" * 60)
            print("Running in competition mode w/ parallel processing")
            print(f"CPUs: {self.scheduler.n_cpus}, GPUs: {self.scheduler.n_gpus}")
            print(f"Time budget: {self.scheduler.time_budget/3600:.1f} hours")
            print("=" * 60)
            sorted_tasks = self.scheduler.profile_tasks(challenges, profile_steps=2)
            time_per_step = self.scheduler.estimate_time_per_step(challenges, test_steps=10 if len(challenges) > 10 else 2)
            n_steps = self.scheduler.calculate_optimal_steps(time_per_step)
            n_steps = min(n_steps, 2500)
            print(f"Running {n_steps} steps per task...")
            _, solutions_dict, time_taken = self.scheduler.parallelize_runs(challenges, n_iterations=n_steps, verbose=True)
            for task_id, solutions in solutions_dict.items():
                all_solutions[task_id] = solutions
            print(f"Completed in {time_taken/3600:.2f} hours")
            print(f"Tasks solved: {len(all_solutions)}")
            print(f"Average steps per task: {n_steps}")
        else:
            for task_id, task in challenges.items():
                print(f"Processing task: {task_id}")
                result = self.solve_task(task, task_id)
                all_solutions[task_id] = result["predictions"]
                if save_visualizations:
                    self.export_solution_images(task, result["predictions"], output_dir, task_id)
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        with open(output_path / "solutions.json", "w") as f:
            json.dump(all_solutions, f, indent=2)
        return all_solutions

In [11]:

def get_gpu_memory_info(gpu_id: int = 0) -> Dict[str, float]:
    """ret detailed memory information for GPU"""
    if not torch.cuda.is_available():
        return {"available": 0, "total": 0, "used": 0, "free": 0}
    mem_info = torch.cuda.mem_get_info(gpu_id)
    total = mem_info[1] / (1024**3)
    available = mem_info[0] / (1024**3)
    used = total - available
    return {
        "available": available,
        "total": total,
        "used": used,
        "free": available,
        "gpu_name": torch.cuda.get_device_name(gpu_id)
    }

def print_system_info():
    """print CPU and GPU config information"""
    print("=" * 60)
    print("System information")
    print("=" * 60)
    print(f"CPUs: {multiprocessing.cpu_count()}")
    if torch.cuda.is_available():
        n_gpus = torch.cuda.device_count()
        print(f"GPUs: {n_gpus}")
        for i in range(n_gpus):
            info = get_gpu_memory_info(i)
            print(f"  GPU {i}: {info['gpu_name']}")
            print(f"    Memory: {info['used']:.1f}/{info['total']:.1f} GB used")
    else:
        print("GPUs: None (CPU mode)")
    print(f"PyTorch: {torch.__version__}")
    print(f"CUDA available: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"CUDA version: {torch.version.cuda}")
    print("=" * 60)





def main():
    """entry when running script as standalone program"""
    import argparse
    parser = argparse.ArgumentParser(description="Enhanced ARC‑AGI2 solver and visualizer")
    parser.add_argument("--challenges", type=str, required=True, help="Path to challenges JSON file")
    parser.add_argument("--output", type=str, default="./arc_output", help="Output directory for results")
    parser.add_argument("--visualize", action="store_true", help="Save visualization images")
    parser.add_argument("--no‑augmentation", action="store_true", help="Disable data augmentation")
    parser.add_argument("--no‑symbolic", action="store_true", help="Disable symbolic search")
    parser.add_argument("--use‑compressarc", action="store_true", help="Enable CompressARC neural method")
    parser.add_argument("--parallel", action="store_true", help="Enable parallel GPU processing")
    parser.add_argument("--competition‑mode", action="store_true", help="Run w/ all optimizations for competition")
    parser.add_argument("--time‑budget", type=float, default=12.0, help="Time budget in hours")
    parser.add_argument("--system‑info", action="store_true", help="Print system information")
    args = parser.parse_args()
    if args.system_info or args.competition_mode:
        print_system_info()
    if args.competition_mode:
        print("\nCompetition mode enabled")
        print("Enabling: CompressARC, parallel processing, all optimizations")
        args.use_compressarc = True
        args.parallel = True
    solver = EnhancedARCSolver(
        use_augmentation=not args.no_augmentation,
        use_symbolic_search=not args.no_symbolic,
        use_neural=False,
        use_compressarc=args.use_compressarc,
        use_parallel=args.parallel,
        time_budget_hours=args.time_budget
    )
    solutions = solver.process_dataset(
        args.challenges,
        args.output,
        save_visualizations=args.visualize,
        optimize_for_competition=args.competition_mode or args.parallel
    )
    print(f"\nProcessed {len(solutions)} tasks")
    print(f"Results saved to: {args.output}")
    #create Kaggle submission format
    submission = {}
    for task_id, task_solutions in solutions.items():
        submission[task_id] = []
        if isinstance(task_solutions, list) and len(task_solutions) > 0:
            if isinstance(task_solutions[0], list):
                for solution in task_solutions:
                    submission[task_id].append({
                        "attempt_1": solution,
                        "attempt_2": solution
                    })
            else:
                submission[task_id].append({
                    "attempt_1": task_solutions,
                    "attempt_2": task_solutions
                })
    with open(Path(args.output) / "submission.json", "w") as f:
        json.dump(submission, f, indent=2)
    print("Submission file created: submission.json")
    if args.competition_mode or args.parallel:
        print("\n" + "=" * 60)
        print("Performance summary")
        print("=" * 60)
        if hasattr(solver, "scheduler"):
            elapsed = time.time() - solver.scheduler.start_time
            print(f"Total time: {elapsed/3600:.2f} hours")
            print(f"Tasks completed: {len(solutions)}")
            if len(solutions) > 0:
                print(f"Average time per task: {elapsed/len(solutions):.2f} seconds")
        print("=" * 60)


if __name__ == "__main__":
    #on some systems using CUDA spawn start method is necessary
    if torch.cuda.is_available():
        multiprocessing.set_start_method("spawn", force=True)
    main()

usage: ipykernel_launcher.py [-h] --challenges CHALLENGES [--output OUTPUT]
                             [--visualize] [--no‑augmentation] [--no‑symbolic]
                             [--use‑compressarc] [--parallel]
                             [--competition‑mode] [--time‑budget TIME‑BUDGET]
                             [--system‑info]
ipykernel_launcher.py: error: the following arguments are required: --challenges


SystemExit: 2