<a href="https://colab.research.google.com/github/MinhTuanDang/ARC-neuro-vector-symbolic-model/blob/main/Neuro_vector_symbolic_Architecture_to_solve_ARC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import json
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# Check for GPU and set device.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

###############################################
# 1. Data Loading and Preprocessing for ARC Tasks
###############################################

# Function to pad a grid (list of lists) to a fixed size.
def pad_grid(grid, max_height=30, max_width=30):
    grid = np.array(grid, dtype=np.int64)
    h, w = grid.shape
    padded = np.zeros((max_height, max_width), dtype=np.int64)
    # Copy original grid into top-left corner.
    padded[:h, :w] = grid
    return padded

# Custom Dataset for ARC tasks.
# Expects each JSON file (in the given directory) to follow the ARC format:
# { "train": [ {"input": grid, "output": grid}, ... ],
#   "test":  [ {"input": grid, "output": grid}, ... ] }
class ARCDataset(Dataset):
    def __init__(self, dataset_dir, split="train", max_height=30, max_width=30):
        """
        dataset_dir: path to the folder containing ARC JSON files.
        split: "train" or "test" (each file contains a list under these keys).
        """
        self.examples = []
        self.max_height = max_height
        self.max_width = max_width
        # List all JSON files in the directory.
        for filename in os.listdir(dataset_dir):
            if filename.endswith(".json"):
                filepath = os.path.join(dataset_dir, filename)
                with open(filepath, "r") as f:
                    task = json.load(f)
                # Get the list of pairs for the chosen split.
                for pair in task.get(split, []):
                    # Process input and output grids.
                    inp_grid = pad_grid(pair["input"], max_height, max_width)
                    out_grid = pad_grid(pair["output"], max_height, max_width)
                    # Flatten to 1D vector of length max_height*max_width.
                    inp_flat = inp_grid.flatten()
                    out_flat = out_grid.flatten()
                    self.examples.append((inp_flat, out_flat))

        print(f"Loaded {len(self.examples)} examples from {dataset_dir} [{split}] split.")

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        inp, out = self.examples[idx]
        # Convert to torch.LongTensor (for token indices)
        return torch.tensor(inp, dtype=torch.long), torch.tensor(out, dtype=torch.long)

###############################################
# 2. Model Definition: Modified LARS-VSA for ARC (Sequence-to-Sequence)
###############################################

# Custom binary activation with straight-through estimator.
class BinaryActivation(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        return input.sign()

    @staticmethod
    def backward(ctx, grad_output):
        return grad_output

def binary_activation(x):
    return BinaryActivation.apply(x)

# Bundling operation: element-wise sign of the sum.
def bundling(h1, h2):
    return binary_activation(h1 + h2)

# A single head of the HDSymbolicAttention module.
class HDSymbolicAttentionHead(nn.Module):
    def __init__(self, input_dim, hyperdim, seq_len):
        """
        input_dim: dimension of embedded tokens.
        hyperdim: dimension of high-dimensional space.
        seq_len: fixed sequence length (e.g. 900 for 30x30 grids).
        """
        super().__init__()
        self.hyperdim = hyperdim
        self.seq_len = seq_len
        # Project input embeddings into hyperdimensional space.
        self.proj = nn.Linear(input_dim, hyperdim)
        # Learned symbolic embeddings per position.
        self.symbolic = nn.Parameter(torch.randn(seq_len, hyperdim))

    def forward(self, x):
        """
        x: Tensor of shape [B, seq_len, input_dim]
        """
        B, N, _ = x.shape
        # Project input into hyperdimensional space.
        h = self.proj(x)  # [B, N, hyperdim]
        # Apply binary activation to simulate bipolar hypervectors (±1).
        h = binary_activation(h)

        # Compute pairwise bundling.
        # h_i: [B, N, 1, hyperdim] and h_j: [B, 1, N, hyperdim]
        h_i = h.unsqueeze(2)
        h_j = h.unsqueeze(1)
        bundled = bundling(h_i, h_j)  # [B, N, N, hyperdim]

        # Compute cosine similarity between h_i and its bundled context.
        dot = (h_i * bundled).sum(dim=-1) / self.hyperdim  # [B, N, N]
        attn_weights = F.softmax(dot, dim=-1)  # [B, N, N]

        # Compute weighted sum over tokens.
        attn_out = torch.bmm(attn_weights, h)  # [B, N, hyperdim]

        # Bind attention output with learned symbolic embeddings.
        symbolic = self.symbolic.unsqueeze(0).expand(B, -1, -1)  # [B, N, hyperdim]
        out = attn_out * symbolic  # element-wise binding
        return out  # [B, N, hyperdim]

# Full LARS-VSA model with multi-head attention adapted for sequence-to-sequence grid transformation.
class LARS_VSA(nn.Module):
    def __init__(self, vocab_size, embed_dim, hyperdim, num_heads, seq_len, num_classes):
        """
        vocab_size: number of distinct tokens (colors), e.g. 10.
        embed_dim: dimension of token embeddings.
        hyperdim: high-dimensional space dimension.
        num_heads: number of attention heads.
        seq_len: fixed sequence length (e.g., 30*30=900).
        num_classes: number of output classes (colors), e.g., 10.
        """
        super().__init__()
        self.seq_len = seq_len
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        # Create multiple attention heads.
        self.heads = nn.ModuleList([
            HDSymbolicAttentionHead(embed_dim, hyperdim, seq_len)
            for _ in range(num_heads)
        ])
        # Batch normalization.
        self.batch_norm = nn.BatchNorm1d(seq_len * hyperdim)
        # Token-wise classifier: predicts a class for each token.
        self.token_classifier = nn.Linear(hyperdim, num_classes)

    def forward(self, x):
        """
        x: Tensor of shape [B, seq_len] with token indices.
        """
        B, N = x.shape
        # Embed input tokens.
        x = self.embedding(x)  # [B, N, embed_dim]
        head_outputs = []
        for head in self.heads:
            head_outputs.append(head(x))  # each: [B, N, hyperdim]
        # Sum outputs from all heads.
        summed = torch.stack(head_outputs, dim=0).sum(dim=0)  # [B, N, hyperdim]
        # Batch normalization: reshape to [B, N*hyperdim] then back.
        bn_in = summed.view(B, -1)
        bn_out = self.batch_norm(bn_in)
        bn_out = bn_out.view(B, N, -1)
        # Token-wise classification.
        logits = self.token_classifier(bn_out)  # [B, N, num_classes]
        return logits

###############################################
# 3. Hyperparameters and Dataset Setup
###############################################

# Parameters for ARC grids.
max_height = 30
max_width = 30
seq_len = max_height * max_width  # 900 tokens

# Model parameters.
vocab_size = 10      # Colors 0-9.
embed_dim = 32
hyperdim = 1024
num_heads = 4
num_classes = 10

# Training parameters.
batch_size = 2
num_epochs = 5
learning_rate = 1e-3

# Directory containing ARC evaluation JSON files.
# (Make sure to upload/mount the 'data/evaluation' folder in Colab.)
dataset_dir_train = "./sample_data/train"  # adjust path as needed for training split
dataset_dir_test  = "./sample_data/test"  # adjust path as needed for test split

# For demonstration, we use the "train" split examples for training and "test" split for evaluation.
train_dataset = ARCDataset(dataset_dir_train, split="train", max_height=max_height, max_width=max_width)
eval_dataset  = ARCDataset(dataset_dir_test, split="test", max_height=max_height, max_width=max_width)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
eval_loader  = DataLoader(eval_dataset, batch_size=batch_size)

###############################################
# 4. Training and Evaluation Functions
###############################################

def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        inputs, targets = inputs.to(device), targets.to(device)  # shape: [B, seq_len]
        optimizer.zero_grad()
        outputs = model(inputs)  # shape: [B, seq_len, num_classes]
        # Reshape for token-wise loss.
        outputs_flat = outputs.view(-1, num_classes)
        targets_flat = targets.view(-1)
        loss = criterion(outputs_flat, targets_flat)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate_epoch(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            outputs_flat = outputs.view(-1, num_classes)
            targets_flat = targets.view(-1)
            loss = criterion(outputs_flat, targets_flat)
            total_loss += loss.item()
    return total_loss / len(dataloader)

###############################################
# 5. Model Initialization and Training Loop
###############################################

model = LARS_VSA(vocab_size, embed_dim, hyperdim, num_heads, seq_len, num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

print("Starting training...")
for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    eval_loss = evaluate_epoch(model, eval_loader, criterion, device)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f} | Eval Loss: {eval_loss:.4f}")

print("Training complete.")

# Example inference: take one batch from evaluation and show predicted output grid.
model.eval()
with torch.no_grad():
    for inputs, targets in eval_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)  # [B, seq_len, num_classes]
        preds = outputs.argmax(dim=-1)  # [B, seq_len]
        # For the first example in the batch, reshape to grid.
        pred_grid = preds[0].cpu().numpy().reshape(max_height, max_width)
        print("Predicted grid (first example):")
        print(pred_grid)
        break

Using device: cpu
Loaded 1302 examples from ./sample_data/train [train] split.
Loaded 419 examples from ./sample_data/test [test] split.
Starting training...
