# LLM-Based DSL Program Search for ARC-AGI-2

This notebook implements an LLM-based approach to search through DSL (Domain Specific Language) programs and apply them to ARC (Abstraction and Reasoning Corpus) tasks.

## Overview
- Uses an open-source LLM to analyze ARC tasks
- Searches through available DSL programs (originally designed for ARC-AGI-1)
- Sequentially applies DSL transformations based on task requirements
- Supports both training and evaluation datasets from ARC-AGI-2

## Configuration

Set these paths before running on Kaggle or other environments:

In [None]:
# ===== CONFIGURATION - Update these paths for your environment =====

# Path to the DSL module directory
DSL_MODULE_PATH = "./arc-dsl"

# Path to ARC-AGI-2 data
ARC_DATA_PATH = "./ARC-AGI-2-main/data"
TRAINING_DATA_PATH = f"{ARC_DATA_PATH}/training"
EVALUATION_DATA_PATH = f"{ARC_DATA_PATH}/evaluation"

# LLM Configuration
# Use a smaller model that fits in P100 GPU (16GB)
LLM_MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct"  # ~7.5GB, good reasoning
# Alternative models:
# "meta-llama/Llama-2-7b-chat-hf"  # Requires Hugging Face token
# "mistralai/Mistral-7B-Instruct-v0.2"  # Good general purpose
# "TinyLlama/TinyLlama-1.1B-Chat-v1.0"  # Very small, faster

USE_GPU = True  # Set to False if no GPU available
MAX_NEW_TOKENS = 512
TEMPERATURE = 0.1  # Lower for more deterministic outputs

print("Configuration loaded successfully!")

## Install Dependencies

In [None]:
# Install required packages
# Uncomment the following lines if running on Kaggle or fresh environment
# !pip install transformers torch accelerate bitsandbytes matplotlib numpy

import sys
import os
import json
import importlib
from pathlib import Path
from typing import List, Dict, Tuple, Any

print("Basic imports successful!")

## Load DSL Module

In [None]:
# Add DSL module to path
if DSL_MODULE_PATH not in sys.path:
    sys.path.insert(0, DSL_MODULE_PATH)

# Import DSL modules
try:
    import dsl
    import constants
    import arc_types
    import solvers
    print("✓ DSL modules loaded successfully")
except ImportError as e:
    print(f"Error loading DSL modules: {e}")
    print("Please ensure DSL_MODULE_PATH is correct")

## Extract DSL Function Catalog

In [None]:
import inspect

def get_dsl_function_catalog():
    """Extract all DSL functions with their signatures and docstrings."""
    catalog = {}
    
    for name, obj in inspect.getmembers(dsl):
        if inspect.isfunction(obj) and not name.startswith('_'):
            sig = str(inspect.signature(obj))
            doc = inspect.getdoc(obj) or "No description available"
            catalog[name] = {
                'signature': sig,
                'docstring': doc,
                'full_name': f"{name}{sig}"
            }
    
    return catalog

def get_solver_catalog():
    """Extract all pre-built solver functions."""
    solver_catalog = {}
    
    for name, obj in inspect.getmembers(solvers):
        if inspect.isfunction(obj) and name.startswith('solve_'):
            task_id = name.replace('solve_', '')
            source = inspect.getsource(obj)
            solver_catalog[task_id] = {
                'name': name,
                'source': source
            }
    
    return solver_catalog

# Build catalogs
dsl_catalog = get_dsl_function_catalog()
solver_catalog = get_solver_catalog()

print(f"✓ Found {len(dsl_catalog)} DSL functions")
print(f"✓ Found {len(solver_catalog)} pre-built solvers")

# Show sample DSL functions
print("\nSample DSL functions:")
for i, (name, info) in enumerate(list(dsl_catalog.items())[:10]):
    print(f"  {name}: {info['docstring'][:50]}...")

## Load ARC Tasks

In [None]:
def load_arc_tasks(data_path: str, limit: int = None) -> Dict[str, Dict]:
    """Load ARC tasks from JSON files."""
    tasks = {}
    json_files = list(Path(data_path).glob("*.json"))
    
    if limit:
        json_files = json_files[:limit]
    
    for json_file in json_files:
        task_id = json_file.stem
        with open(json_file, 'r') as f:
            tasks[task_id] = json.load(f)
    
    return tasks

def visualize_task(task_data: Dict, max_examples: int = 3):
    """Visualize ARC task examples using matplotlib."""
    try:
        import matplotlib.pyplot as plt
        import matplotlib.colors as mcolors
        import numpy as np
    except ImportError:
        print("matplotlib not available for visualization")
        return
    
    # ARC color palette (0-9)
    arc_colors = [
        '#000000', '#0074D9', '#FF4136', '#2ECC40', '#FFDC00',
        '#AAAAAA', '#F012BE', '#FF851B', '#7FDBFF', '#870C25'
    ]
    cmap = mcolors.ListedColormap(arc_colors)
    norm = mcolors.Normalize(vmin=0, vmax=9)
    
    # Plot training examples
    train_examples = task_data.get('train', [])[:max_examples]
    n_examples = len(train_examples)
    
    if n_examples == 0:
        print("No training examples to visualize")
        return
    
    fig, axes = plt.subplots(n_examples, 2, figsize=(8, 3*n_examples))
    if n_examples == 1:
        axes = [axes]
    
    for i, example in enumerate(train_examples):
        # Input
        axes[i][0].imshow(np.array(example['input']), cmap=cmap, norm=norm)
        axes[i][0].set_title(f'Training Example {i+1} - Input')
        axes[i][0].axis('off')
        axes[i][0].grid(True, which='both', color='lightgray', linewidth=0.5)
        
        # Output
        axes[i][1].imshow(np.array(example['output']), cmap=cmap, norm=norm)
        axes[i][1].set_title(f'Training Example {i+1} - Output')
        axes[i][1].axis('off')
        axes[i][1].grid(True, which='both', color='lightgray', linewidth=0.5)
    
    plt.tight_layout()
    plt.show()

# Load sample tasks
print("Loading training tasks...")
training_tasks = load_arc_tasks(TRAINING_DATA_PATH, limit=10)
print(f"✓ Loaded {len(training_tasks)} training tasks")

# Show a sample task
if training_tasks:
    sample_task_id = list(training_tasks.keys())[0]
    print(f"\nSample task: {sample_task_id}")
    sample_task = training_tasks[sample_task_id]
    print(f"  Train examples: {len(sample_task['train'])}")
    print(f"  Test examples: {len(sample_task['test'])}")

## Visualize Sample Task

In [None]:
# Visualize the first task
if training_tasks:
    task_id = list(training_tasks.keys())[0]
    print(f"Visualizing task: {task_id}")
    visualize_task(training_tasks[task_id], max_examples=3)

## Initialize LLM

In [None]:
# Initialize the LLM
try:
    from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
    import torch
    
    print(f"Loading model: {LLM_MODEL_NAME}...")
    
    # Check GPU availability
    device = "cuda" if USE_GPU and torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    
    if device == "cuda":
        print(f"GPU: {torch.cuda.get_device_name(0)}")
        print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
    
    # Load tokenizer and model with memory optimization
    tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL_NAME, trust_remote_code=True)
    
    model = AutoModelForCausalLM.from_pretrained(
        LLM_MODEL_NAME,
        torch_dtype=torch.float16 if device == "cuda" else torch.float32,
        device_map="auto" if device == "cuda" else None,
        trust_remote_code=True,
        low_cpu_mem_usage=True
    )
    
    if device == "cpu":
        model = model.to(device)
    
    # Create pipeline
    llm_pipeline = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=MAX_NEW_TOKENS,
        temperature=TEMPERATURE,
        do_sample=True if TEMPERATURE > 0 else False,
        pad_token_id=tokenizer.eos_token_id
    )
    
    print("✓ LLM initialized successfully")
    
except Exception as e:
    print(f"Error initializing LLM: {e}")
    print("\nYou can continue with a mock LLM for testing purposes.")
    llm_pipeline = None

## DSL Program Analysis and Selection

In [None]:
def format_dsl_catalog_for_prompt(catalog: Dict, max_functions: int = 50) -> str:
    """Format DSL catalog for LLM prompt."""
    lines = ["Available DSL Functions:"]
    
    for i, (name, info) in enumerate(list(catalog.items())[:max_functions]):
        lines.append(f"{i+1}. {name}: {info['docstring']}")
    
    if len(catalog) > max_functions:
        lines.append(f"... and {len(catalog) - max_functions} more functions")
    
    return "\n".join(lines)

def format_task_for_prompt(task_data: Dict) -> str:
    """Format ARC task for LLM prompt."""
    lines = ["ARC Task:"]
    
    # Training examples
    lines.append(f"\nTraining Examples ({len(task_data['train'])}):")
    for i, example in enumerate(task_data['train'][:3]):  # Limit to first 3
        lines.append(f"\nExample {i+1}:")
        lines.append(f"Input shape: {len(example['input'])}x{len(example['input'][0])}")
        lines.append(f"Output shape: {len(example['output'])}x{len(example['output'][0])}")
        lines.append(f"Input: {example['input'][:3]}...")  # Show first 3 rows
        lines.append(f"Output: {example['output'][:3]}...")
    
    return "\n".join(lines)

def create_analysis_prompt(task_data: Dict, dsl_catalog: Dict) -> str:
    """Create prompt for LLM to analyze task and suggest DSL operations."""
    prompt = f"""You are an expert in solving ARC (Abstraction and Reasoning Corpus) tasks using DSL operations.

{format_task_for_prompt(task_data)}

{format_dsl_catalog_for_prompt(dsl_catalog, max_functions=30)}

Analyze the transformation pattern in the training examples above.

Think step by step:
1. What visual patterns do you see in the inputs?
2. How do the outputs differ from the inputs?
3. What DSL operations would achieve this transformation?

Provide a sequence of DSL function calls that would solve this task.
Format your answer as Python code that takes input grid I and returns output grid O.

Example format:
```python
def solve(I):
    # Step 1: operation
    x1 = operation1(I)
    # Step 2: another operation
    O = operation2(x1)
    return O
```

Your solution:"""
    
    return prompt

def query_llm(prompt: str) -> str:
    """Query the LLM with a prompt."""
    if llm_pipeline is None:
        return "[Mock LLM Response: Unable to generate solution without initialized LLM]"
    
    try:
        result = llm_pipeline(prompt, max_new_tokens=MAX_NEW_TOKENS)[0]['generated_text']
        # Extract only the generated part (after the prompt)
        response = result[len(prompt):].strip()
        return response
    except Exception as e:
        return f"[Error querying LLM: {e}]"

print("✓ Analysis functions ready")

## Extract and Execute DSL Code

In [None]:
import re

def extract_python_code(llm_response: str) -> str:
    """Extract Python code from LLM response."""
    # Try to find code in markdown code blocks
    code_blocks = re.findall(r'```python\n(.*?)```', llm_response, re.DOTALL)
    if code_blocks:
        return code_blocks[0].strip()
    
    # Try to find code without markdown
    code_blocks = re.findall(r'```\n(.*?)```', llm_response, re.DOTALL)
    if code_blocks:
        return code_blocks[0].strip()
    
    # If no code blocks, try to extract def solve
    match = re.search(r'(def solve.*?return \w+)', llm_response, re.DOTALL)
    if match:
        return match.group(1).strip()
    
    return ""

def execute_dsl_code(code: str, input_grid: List[List[int]]) -> Any:
    """Execute DSL code on an input grid."""
    if not code:
        return None
    
    try:
        # Convert input to tuple format (as required by DSL)
        I = tuple(tuple(row) for row in input_grid)
        
        # Create execution namespace with DSL functions
        namespace = {
            '__builtins__': __builtins__,
            'I': I,
        }
        
        # Add all DSL functions to namespace
        for name in dir(dsl):
            if not name.startswith('_'):
                namespace[name] = getattr(dsl, name)
        
        # Add all constants
        for name in dir(constants):
            if not name.startswith('_'):
                namespace[name] = getattr(constants, name)
        
        # Execute the code
        exec(code, namespace)
        
        # Call the solve function
        if 'solve' in namespace:
            result = namespace['solve'](I)
            # Convert back to list format
            if isinstance(result, tuple):
                return [list(row) for row in result]
            return result
        
        return None
    
    except Exception as e:
        print(f"Error executing code: {e}")
        return None

def evaluate_solution(predicted: Any, expected: List[List[int]]) -> bool:
    """Check if predicted output matches expected output."""
    if predicted is None:
        return False
    
    # Convert to list if needed
    if isinstance(predicted, tuple):
        predicted = [list(row) for row in predicted]
    
    return predicted == expected

print("✓ Execution functions ready")

## Main Solver Pipeline

In [None]:
def solve_task_with_llm(task_id: str, task_data: Dict, verbose: bool = True) -> Dict:
    """Solve an ARC task using LLM-guided DSL program search."""
    result = {
        'task_id': task_id,
        'solved': False,
        'code': None,
        'predictions': [],
        'accuracy': 0.0
    }
    
    # Check if there's a pre-built solver
    if task_id in solver_catalog:
        if verbose:
            print(f"Found pre-built solver for {task_id}")
        code = solver_catalog[task_id]['source']
        # Extract just the function body
        result['code'] = code
        result['using_prebuilt'] = True
    else:
        # Use LLM to generate solution
        if verbose:
            print(f"Analyzing task {task_id} with LLM...")
        
        prompt = create_analysis_prompt(task_data, dsl_catalog)
        llm_response = query_llm(prompt)
        
        if verbose:
            print(f"LLM Response:\n{llm_response[:500]}...\n")
        
        code = extract_python_code(llm_response)
        result['code'] = code
        result['llm_response'] = llm_response
        result['using_prebuilt'] = False
    
    if not code:
        if verbose:
            print("No code generated")
        return result
    
    # Test on training examples
    correct = 0
    total = len(task_data['train'])
    
    for i, example in enumerate(task_data['train']):
        predicted = execute_dsl_code(code, example['input'])
        is_correct = evaluate_solution(predicted, example['output'])
        
        if verbose:
            print(f"  Training example {i+1}: {'✓' if is_correct else '✗'}")
        
        if is_correct:
            correct += 1
    
    result['accuracy'] = correct / total if total > 0 else 0
    result['solved'] = (correct == total)
    
    # Generate predictions for test examples
    for example in task_data['test']:
        predicted = execute_dsl_code(code, example['input'])
        result['predictions'].append(predicted)
    
    if verbose:
        print(f"Task {task_id}: {'SOLVED' if result['solved'] else 'FAILED'} ({correct}/{total})")
    
    return result

print("✓ Main solver pipeline ready")

## Test on Sample Tasks

In [None]:
# Test on a few tasks
num_tasks_to_test = 3
results = []

for task_id in list(training_tasks.keys())[:num_tasks_to_test]:
    print(f"\n{'='*60}")
    print(f"Testing task: {task_id}")
    print(f"{'='*60}")
    
    result = solve_task_with_llm(task_id, training_tasks[task_id], verbose=True)
    results.append(result)
    
    # Visualize result
    if result['predictions']:
        print(f"\nGenerated prediction for test input")

# Summary
print(f"\n{'='*60}")
print("Summary")
print(f"{'='*60}")
solved = sum(1 for r in results if r['solved'])
print(f"Solved: {solved}/{len(results)}")
avg_accuracy = sum(r['accuracy'] for r in results) / len(results) if results else 0
print(f"Average accuracy: {avg_accuracy:.2%}")

## Batch Processing

In [None]:
def batch_solve_tasks(tasks: Dict[str, Dict], max_tasks: int = None) -> List[Dict]:
    """Solve multiple ARC tasks in batch."""
    results = []
    task_ids = list(tasks.keys())[:max_tasks] if max_tasks else list(tasks.keys())
    
    for i, task_id in enumerate(task_ids):
        print(f"\nProcessing task {i+1}/{len(task_ids)}: {task_id}")
        result = solve_task_with_llm(task_id, tasks[task_id], verbose=False)
        results.append(result)
        print(f"  Result: {'SOLVED' if result['solved'] else 'FAILED'} (accuracy: {result['accuracy']:.2%})")
    
    return results

def generate_submission(results: List[Dict], output_path: str = "submission.json"):
    """Generate submission file from results."""
    submission = {}
    
    for result in results:
        task_id = result['task_id']
        predictions = result['predictions']
        
        # Format predictions for submission
        submission[task_id] = [
            {'attempt_1': pred, 'attempt_2': pred} 
            for pred in predictions
        ]
    
    with open(output_path, 'w') as f:
        json.dump(submission, f, indent=2)
    
    print(f"✓ Submission saved to {output_path}")

print("✓ Batch processing functions ready")

## Run Full Pipeline (Optional)

Uncomment and run to process all tasks:

In [None]:
# # Load all training tasks
# all_training_tasks = load_arc_tasks(TRAINING_DATA_PATH)
# print(f"Loaded {len(all_training_tasks)} training tasks")

# # Process tasks (limit for testing)
# all_results = batch_solve_tasks(all_training_tasks, max_tasks=20)

# # Statistics
# solved = sum(1 for r in all_results if r['solved'])
# total = len(all_results)
# avg_acc = sum(r['accuracy'] for r in all_results) / total

# print(f"\nFinal Statistics:")
# print(f"  Solved: {solved}/{total} ({solved/total:.2%})")
# print(f"  Average accuracy: {avg_acc:.2%}")

# # Generate submission for evaluation set
# # eval_tasks = load_arc_tasks(EVALUATION_DATA_PATH)
# # eval_results = batch_solve_tasks(eval_tasks)
# # generate_submission(eval_results, "arc_submission.json")

## Training-Based DSL Program Synthesis

This section implements a training-based approach that:
1. Searches through DSL program combinations
2. Validates programs against training examples
3. Selects the best program for each task
4. Applies learned programs to test cases

This is more systematic than LLM generation and provides better accuracy.

In [None]:
class DSLProgramSynthesizer:
    """Synthesizes DSL programs by searching through combinations."""
    
    def __init__(self, max_depth=3):
        self.max_depth = max_depth
        self.dsl_functions = self._extract_dsl_functions()
        self.simple_transforms = self._get_simple_transforms()
        
    def _extract_dsl_functions(self):
        """Extract all DSL functions."""
        functions = {}
        for name in dir(dsl):
            if not name.startswith('_'):
                obj = getattr(dsl, name)
                if callable(obj):
                    functions[name] = obj
        return functions
    
    def _get_simple_transforms(self):
        """Get list of simple grid transformation functions."""
        transforms = [
            'identity',
            'vmirror', 'hmirror', 'dmirror', 'cmirror',
            'rot90', 'rot180', 'rot270',
            'upscale', 'downscale',
            'trim', 'compress',
        ]
        return [t for t in transforms if t in self.dsl_functions]
    
    def _try_simple_transform(self, train_examples):
        """Try simple single-function transformations."""
        for func_name in self.simple_transforms:
            func = self.dsl_functions[func_name]
            
            all_match = True
            for example in train_examples:
                try:
                    I = tuple(tuple(row) for row in example['input'])
                    expected = tuple(tuple(row) for row in example['output'])
                    
                    result = func(I)
                    if result != expected:
                        all_match = False
                        break
                except:
                    all_match = False
                    break
            
            if all_match:
                return f"def solve(I):\n    O = {func_name}(I)\n    return O"
        
        return None
    
    def _try_prebuilt_solver(self, task_id, train_examples):
        """Check if a pre-built solver exists and works."""
        import inspect
        solver_name = f"solve_{task_id}"
        
        if hasattr(solvers, solver_name):
            solver_func = getattr(solvers, solver_name)
            
            all_match = True
            for example in train_examples:
                try:
                    I = tuple(tuple(row) for row in example['input'])
                    expected = tuple(tuple(row) for row in example['output'])
                    
                    result = solver_func(I)
                    if result != expected:
                        all_match = False
                        break
                except Exception as e:
                    all_match = False
                    break
            
            if all_match:
                source = inspect.getsource(solver_func)
                source = source.replace(f"def {solver_name}(", "def solve(")
                return source
        
        return None
    
    def _try_combined_transforms(self, train_examples):
        """Try combinations of 2 simple transforms."""
        for func1_name in self.simple_transforms[:10]:
            for func2_name in self.simple_transforms[:10]:
                func1 = self.dsl_functions[func1_name]
                func2 = self.dsl_functions[func2_name]
                
                all_match = True
                for example in train_examples:
                    try:
                        I = tuple(tuple(row) for row in example['input'])
                        expected = tuple(tuple(row) for row in example['output'])
                        
                        temp = func1(I)
                        result = func2(temp)
                        
                        if result != expected:
                            all_match = False
                            break
                    except:
                        all_match = False
                        break
                
                if all_match:
                    code = f"""def solve(I):
    x1 = {func1_name}(I)
    O = {func2_name}(x1)
    return O"""
                    return code
        
        return None
    
    def synthesize_program(self, task_id, train_examples):
        """Synthesize a DSL program for the given task."""
        # Strategy 1: Check for pre-built solver
        program = self._try_prebuilt_solver(task_id, train_examples)
        if program:
            return program, "prebuilt"
        
        # Strategy 2: Try simple single transforms
        program = self._try_simple_transform(train_examples)
        if program:
            return program, "simple_transform"
        
        # Strategy 3: Try combined transforms
        program = self._try_combined_transforms(train_examples)
        if program:
            return program, "combined_transforms"
        
        return None, "none"

print("✓ DSL Program Synthesizer ready")

## Training on All Tasks

In [None]:
def execute_dsl_program(code, input_grid):
    """Execute a DSL program on an input grid."""
    if not code:
        return None
    
    try:
        I = tuple(tuple(row) for row in input_grid)
        
        namespace = {
            '__builtins__': __builtins__,
            'I': I,
        }
        
        # Add all DSL functions
        for name in dir(dsl):
            if not name.startswith('_'):
                namespace[name] = getattr(dsl, name)
        
        # Add all constants
        for name in dir(constants):
            if not name.startswith('_'):
                namespace[name] = getattr(constants, name)
        
        exec(code, namespace)
        
        if 'solve' in namespace:
            result = namespace['solve'](I)
            if isinstance(result, tuple):
                return [list(row) for row in result]
            return result
        
        return None
    
    except Exception as e:
        return None


def train_on_all_tasks(tasks, limit=None):
    """Train DSL programs on all tasks."""
    synthesizer = DSLProgramSynthesizer()
    
    task_ids = list(tasks.keys())[:limit] if limit else list(tasks.keys())
    results = {}
    
    from collections import defaultdict
    stats = {
        'total': len(task_ids),
        'solved': 0,
        'methods': defaultdict(int),
        'avg_accuracy': 0.0
    }
    
    print(f"Training on {len(task_ids)} tasks...")
    
    for i, task_id in enumerate(task_ids):
        task_data = tasks[task_id]
        
        # Synthesize program
        program, method = synthesizer.synthesize_program(task_id, task_data['train'])
        
        # Validate on training data
        train_correct = 0
        for example in task_data['train']:
            predicted = execute_dsl_program(program, example['input'])
            if predicted == example['output']:
                train_correct += 1
        
        train_accuracy = train_correct / len(task_data['train']) if task_data['train'] else 0
        
        # Generate test predictions
        test_predictions = []
        for test_example in task_data['test']:
            predicted = execute_dsl_program(program, test_example['input'])
            if predicted is None:
                predicted = [[0] * len(test_example['input'][0]) for _ in range(len(test_example['input']))]
            test_predictions.append(predicted)
        
        results[task_id] = {
            'program': program,
            'method': method,
            'train_accuracy': train_accuracy,
            'test_predictions': test_predictions
        }
        
        # Update stats
        if train_accuracy == 1.0:
            stats['solved'] += 1
        stats['methods'][method] += 1
        stats['avg_accuracy'] += train_accuracy
        
        if (i + 1) % 100 == 0:
            print(f"  Processed {i + 1}/{len(task_ids)} tasks...")
    
    stats['avg_accuracy'] /= len(task_ids)
    
    print(f"\nTraining complete!")
    print(f"  Solved: {stats['solved']}/{stats['total']} ({stats['solved']/stats['total']:.1%})")
    print(f"  Average accuracy: {stats['avg_accuracy']:.1%}")
    print(f"  Methods used:")
    for method, count in stats['methods'].items():
        print(f"    {method}: {count}")
    
    return results, stats

print("✓ Training functions ready")

## Run Training on All 1000 Tasks

This cell trains DSL programs on all 1000 training tasks.

In [None]:
# Load all training tasks
print("Loading all training tasks...")
all_training_tasks = load_arc_tasks(TRAINING_DATA_PATH)
print(f"Loaded {len(all_training_tasks)} training tasks")

# Train on all tasks
trained_results, training_stats = train_on_all_tasks(all_training_tasks)

# Save trained programs
import json
with open('trained_programs.json', 'w') as f:
    json.dump({
        'results': trained_results,
        'stats': training_stats
    }, f, indent=2)

print("\n✓ Trained programs saved to trained_programs.json")

## Generate Predictions for Evaluation Data

In [None]:
# Load evaluation tasks
print("Loading evaluation tasks...")
eval_tasks = load_arc_tasks(EVALUATION_DATA_PATH)
print(f"Loaded {len(eval_tasks)} evaluation tasks")

# Generate predictions for evaluation tasks
eval_results = {}
for task_id, task_data in eval_tasks.items():
    # Check if we have a trained program for this task
    if task_id in trained_results:
        program = trained_results[task_id]['program']
        predictions = []
        
        for test_example in task_data['test']:
            predicted = execute_dsl_program(program, test_example['input'])
            if predicted is None:
                predicted = [[0] * len(test_example['input'][0]) for _ in range(len(test_example['input']))]
            predictions.append(predicted)
        
        eval_results[task_id] = predictions
    else:
        # No trained program, return zero predictions
        predictions = []
        for test_example in task_data['test']:
            predicted = [[0] * len(test_example['input'][0]) for _ in range(len(test_example['input']))]
            predictions.append(predicted)
        eval_results[task_id] = predictions

print(f"\nGenerated predictions for {len(eval_results)} evaluation tasks")

## Generate Kaggle Submission File

In [None]:
# Generate submission file in Kaggle format
submission = {}

for task_id, predictions in eval_results.items():
    # ARC submission format: 2 attempts per test case
    submission[task_id] = [
        {'attempt_1': pred, 'attempt_2': pred}
        for pred in predictions
    ]

with open('arc_submission.json', 'w') as f:
    json.dump(submission, f, indent=2)

print("✓ Submission file saved to arc_submission.json")
print(f"  Ready for Kaggle submission with {len(submission)} tasks")

## Additional DSL Programs for ARC-AGI-2

Since the original DSL was designed for ARC-AGI-1, here are some additional useful programs that can be added to handle ARC-AGI-2 tasks:

In [None]:
def extended_dsl_operations():
    """
    Additional DSL-style operations that might be useful for ARC-AGI-2.
    These can be added to the dsl.py module if needed.
    """
    
    def find_repeating_pattern(grid):
        """Detect repeating patterns in the grid."""
        # Implementation for pattern detection
        pass
    
    def symmetry_detection(grid):
        """Detect if grid has horizontal, vertical, or diagonal symmetry."""
        # Implementation for symmetry detection
        pass
    
    def color_frequency_analysis(grid):
        """Analyze frequency of each color in the grid."""
        from collections import Counter
        flat = [cell for row in grid for cell in row]
        return Counter(flat)
    
    def find_largest_region(grid, color):
        """Find the largest contiguous region of a specific color."""
        # Implementation for region finding
        pass
    
    def grid_interpolation(grid, factor):
        """Interpolate grid to increase resolution."""
        # Implementation for interpolation
        pass
    
    return {
        'find_repeating_pattern': find_repeating_pattern,
        'symmetry_detection': symmetry_detection,
        'color_frequency_analysis': color_frequency_analysis,
        'find_largest_region': find_largest_region,
        'grid_interpolation': grid_interpolation,
    }

print("✓ Extended DSL operations defined")
print("\nNote: These are template functions. Implement them as needed for specific task types.")

## Usage Instructions

### On Kaggle:

1. Upload this notebook to Kaggle
2. Add the ARC-AGI-2 dataset as input data
3. Update configuration paths at the top:
   ```python
   DSL_MODULE_PATH = "/kaggle/input/arc-dsl/arc-dsl"
   ARC_DATA_PATH = "/kaggle/input/arc-agi-2/data"
   ```
4. Enable GPU accelerator (P100)
5. Install dependencies (uncomment the pip install cell)
6. Run all cells

### Local Usage:

1. Ensure you have the DSL module and ARC-AGI-2 data
2. Update paths in configuration cell
3. Install dependencies: `pip install transformers torch matplotlib numpy`
4. Run cells sequentially

### Customization:

- Adjust `LLM_MODEL_NAME` for different models
- Modify `MAX_NEW_TOKENS` and `TEMPERATURE` for different generation behavior
- Extend DSL operations as needed for specific task types
- Modify prompts in `create_analysis_prompt` for better task understanding