# ARC Prize 2025
URL: https://www.kaggle.com/competitions/arc-prize-2025

In [17]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [18]:
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy
import itertools
from torch.utils.data import Dataset, DataLoader

In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [20]:
# train_solutions = json.loads("/kaggle/input/arc-prize-2025/arc-agi_training_solutions.json")
# train_challenges = json.loads("/kaggle/input/arc-prize-2025/arc-agi_training_challenges.json")
# eval_solutions = json.loads("/kaggle/input/arc-prize-2025/arc-agi_evaluation_solutions.json")
# eval_challenges = json.loads("/kaggle/input/arc-prize-2025/arc-agi_evaluation_challenges.json")

# file_path
train_solutions = json.loads(open("data/ARC_Prize_2025/arc-agi_training_solutions.json").read())
train_challenges = json.loads(open("data/ARC_Prize_2025/arc-agi_training_challenges.json").read())
eval_solutions = json.loads(open("data/ARC_Prize_2025/arc-agi_evaluation_solutions.json").read())
eval_challenges = json.loads(open("data/ARC_Prize_2025/arc-agi_evaluation_challenges.json").read())

- LightGBM
- Transformers
- Graph Neural Networks
- Program Synthesis / Sysbolic AI
- Neuro-Symbolic AI

1. LightGBM
2. First of all, CNN + Transformers is the base line.
3. Second of all, Vision Transformer + Attension.
4. Third of all, Graph Neural Network.
5. Last of all, Neuro-Symbolic like hybrid approach.

# Transformers

In [21]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0

        # Initialize dimensions
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query
        self.W_k = nn.Linear(d_model, d_model) # Key
        self.W_v = nn.Linear(d_model, d_model) # Value
        self.W_o = nn.Linear(d_model, d_model) # Output

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask if provided
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)

        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
    
    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        bathch_size, seq_len, d_model = x.size()
        return x.view(bathch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
    
    def combine_heads(self, x):
        # Combine the heads back to the original shape
        batch_size, _, seq_len, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
    
    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [22]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [23]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)  # (max_seq_length,1)

        # compute for even indices only to match sin/cos pairs
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # even dims
        pe[:, 1::2] = torch.cos(position * div_term)  # odd dims

        self.register_buffer('pe', pe.unsqueeze(0))  # (1, max_seq_length, d_model)

    def forward(self, x):
        # x: (batch, seq_len, d_model) - ensure pe matches dtype/device
        return x + self.pe[:, :x.size(1)].to(dtype=x.dtype, device=x.device)

In [24]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [25]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [26]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

In [27]:
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

In [29]:
max_hight = 0
max_width = 0

datasets_list = [train_challenges, eval_challenges]

for dataset in datasets_list:
    for data in dataset.keys():
        item = dataset[data]["train"]
        itme_len = len(item)
        for i in range(itme_len):
            # input
            max_hight = max(max_hight, len(item[i]["input"]))
            max_width = max(max_width, len(item[i]["input"][0]))
            # output
            max_hight = max(max_hight, len(item[i]["output"]))
            max_width = max(max_width, len(item[i]["output"][0]))
    
    print(f"{dataset} ==> max_hight: {max_hight}, max_width: {max_width}")

print(f"max_hight: {max_hight}, max_width: {max_width}")

{'00576224': {'train': [{'input': [[7, 9], [4, 3]], 'output': [[7, 9, 7, 9, 7, 9], [4, 3, 4, 3, 4, 3], [9, 7, 9, 7, 9, 7], [3, 4, 3, 4, 3, 4], [7, 9, 7, 9, 7, 9], [4, 3, 4, 3, 4, 3]]}, {'input': [[8, 6], [6, 4]], 'output': [[8, 6, 8, 6, 8, 6], [6, 4, 6, 4, 6, 4], [6, 8, 6, 8, 6, 8], [4, 6, 4, 6, 4, 6], [8, 6, 8, 6, 8, 6], [6, 4, 6, 4, 6, 4]]}], 'test': [{'input': [[3, 2], [7, 8]]}]}, '007bbfb7': {'train': [{'input': [[6, 6, 0], [6, 0, 0], [0, 6, 6]], 'output': [[6, 6, 0, 6, 6, 0, 0, 0, 0], [6, 0, 0, 6, 0, 0, 0, 0, 0], [0, 6, 6, 0, 6, 6, 0, 0, 0], [6, 6, 0, 0, 0, 0, 0, 0, 0], [6, 0, 0, 0, 0, 0, 0, 0, 0], [0, 6, 6, 0, 0, 0, 0, 0, 0], [0, 0, 0, 6, 6, 0, 6, 6, 0], [0, 0, 0, 6, 0, 0, 6, 0, 0], [0, 0, 0, 0, 6, 6, 0, 6, 6]]}, {'input': [[4, 0, 4], [0, 0, 0], [0, 4, 0]], 'output': [[4, 0, 4, 0, 0, 0, 4, 0, 4], [0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 4, 0, 0, 0, 0, 0, 4, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 4, 0, 4, 0, 0, 0], [0, 0, 0, 0,

In [30]:
# --- special tokens ---
PAD = 0
SOS = 1
EOS = 2
_reserved_start = 3

def flatten_grid(grid):
    # grid は list[list[int]] を想定（行優先 flatten）
    return [int(v) for row in grid for v in row]

# --- build vocab from train_challenges ---
unique_vals = set()
max_src_len = 0
max_tgt_len = 0

for cid, chal in train_challenges.items():
    for ex in chal.get("train", []):
        src_seq = flatten_grid(ex["input"])
        tgt_seq = flatten_grid(ex["output"])
        unique_vals.update(src_seq)
        unique_vals.update(tgt_seq)
        max_src_len = max(max_src_len, len(src_seq))
        # tgt will include SOS/EOS during encoding
        max_tgt_len = max(max_tgt_len, len(tgt_seq) + 2)

# deterministic ordering
unique_vals = sorted(unique_vals)
value2idx = {v: i + _reserved_start for i, v in enumerate(unique_vals)}
vocab_size = _reserved_start + len(unique_vals)

# --- encode helpers ---
def encode_src(grid, pad_len):
    seq = flatten_grid(grid)
    ids = [value2idx[v] for v in seq]
    ids = ids + [PAD] * (pad_len - len(ids))
    return ids

def encode_tgt(grid, pad_len):
    seq = flatten_grid(grid)
    ids = [SOS] + [value2idx[v] for v in seq] + [EOS]
    ids = ids + [PAD] * (pad_len - len(ids))
    return ids

# --- prepare tensors ---
src_list = []
tgt_list = []
for cid, chal in train_challenges.items():
    for ex in chal.get("train", []):
        src_list.append(encode_src(ex["input"], max_src_len))
        tgt_list.append(encode_tgt(ex["output"], max_tgt_len))

src_tensor = torch.tensor(src_list, dtype=torch.long)
tgt_tensor = torch.tensor(tgt_list, dtype=torch.long)

class ARCDataset(Dataset):
    def __init__(self, src, tgt):
        self.src = src
        self.tgt = tgt
    def __len__(self):
        return self.src.size(0)
    def __getitem__(self, idx):
        return self.src[idx], self.tgt[idx]

batch_size = 32
dataset = ARCDataset(src_tensor, tgt_tensor)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# --- recreate Transformer with correct vocab and seq length ---
max_seq_length = max(max_src_len, max_tgt_len)
# reuse hyperparams d_model, num_heads, num_layers, d_ff, dropout if defined
transformer = Transformer(vocab_size, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout).to(device)

# --- training snippet ---
criterion = nn.CrossEntropyLoss(ignore_index=PAD)
optimizer = optim.Adam(transformer.parameters(), lr=1e-4)

transformer.train()
for epoch in range(3):
    epoch_loss = 0.0
    for src_batch, tgt_batch in loader:
        src_batch = src_batch.to(device)
        tgt_batch = tgt_batch.to(device)
        optimizer.zero_grad()
        # input to decoder: all tokens except last
        decoder_input = tgt_batch[:, :-1]
        output = transformer(src_batch, decoder_input)  # (B, L, vocab)
        # collapse and compute loss against tgt_batch[:,1:]
        loss = criterion(output.contiguous().view(-1, vocab_size), tgt_batch[:, 1:].contiguous().view(-1))
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"epoch {epoch+1} avg_loss {epoch_loss/len(loader):.4f}")

# --- quick eval example (no teacher forcing loop here, uses same batching) ---
transformer.eval()
with torch.no_grad():
    src_batch, tgt_batch = next(iter(loader))
    src_batch = src_batch.to(device)
    tgt_batch = tgt_batch.to(device)
    out = transformer(src_batch, tgt_batch[:, :-1])
    val_loss = criterion(out.contiguous().view(-1, vocab_size), tgt_batch[:, 1:].contiguous().view(-1))
    print("val loss", val_loss.item())

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu!

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(5):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

In [None]:
transformer.eval()

# Generate random sample validation data
val_src_data = torch.randint(1, src_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)
val_tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))  # (batch_size, seq_length)

with torch.no_grad():

    val_output = transformer(val_src_data, val_tgt_data[:, :-1])
    val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), val_tgt_data[:, 1:].contiguous().view(-1))
    print(f"Validation Loss: {val_loss.item()}")

# Reference

- [1] https://www.datacamp.com/tutorial/building-a-transformer-with-py-torch
- [2] https://medium.com/data-science/transformers-explained-visually-part-3-multi-head-attention-deep-dive-1c1ff1024853
- [3] https://www.ibm.com/think/topics/positional-encoding
- [4] https://qiita.com/Uking/items/d7bb7da33d2bbe3eeb71