# üåä WakingOrca v1.0 - Quantum Disentangled ARC Solver

**Philosophy**: Not pattern matching. Understanding the causal structure of intelligence.

**Target**: 10-20% accuracy baseline with room for iteration

**Size**: <1MB for Kaggle compatibility

**Author**: Ryan Cardwell & Claude

**Date**: November 2025

In [1]:
# Cell 1: Core Imports & Configuration
import json
import numpy as np
import time
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from collections import defaultdict, Counter
from itertools import product, combinations
import hashlib
import warnings
warnings.filterwarnings('ignore')

# Configuration
@dataclass
class Config:
    """Global configuration for the solver"""
    # Paths
    data_path: str = '/kaggle/input/arc-prize-2025'
    output_path: str = '/kaggle/working'
    
    # Time management
    total_time_budget: float = 900.0  # 15 minutes total
    time_per_task: float = 2.0  # Max seconds per task
    
    # Solver parameters
    max_solutions_per_task: int = 2
    recursion_depth: int = 3
    superposition_branches: int = 5
    
    # Quantum parameters
    collapse_threshold: float = 0.7  # When to collapse superposition
    entanglement_detection: bool = True
    
config = Config()
print(f"WakingOrca v1.0 initialized")
print(f"Time budget: {config.total_time_budget}s")
print(f"Target: 10-20% accuracy baseline")

WakingOrca v1.0 initialized
Time budget: 900.0s
Target: 10-20% accuracy baseline


In [2]:
# Cell 2: Data Loading with Fallbacks
def load_arc_data(config: Config) -> Tuple[Dict, Dict, Dict]:
    """Load ARC data with multiple fallback strategies"""
    
    training_tasks = {}
    test_tasks = {}
    sample_submission = {}
    
    # Try Kaggle paths first
    kaggle_paths = [
        Path(config.data_path),
        Path('/kaggle/input/arc-prize-2025'),
        Path('./kaggle/input/arc-prize-2025')
    ]
    
    data_path = None
    for path in kaggle_paths:
        if path.exists():
            data_path = path
            break
    
    if not data_path:
        # Local development fallback
        print("Using local development mode")
        # Create minimal test data for development
        training_tasks = create_mock_data(5)
        test_tasks = create_mock_data(3)
        return training_tasks, test_tasks, sample_submission
    
    # Load training tasks
    train_path = data_path / 'arc-agi_training_challenges.json'
    if train_path.exists():
        with open(train_path, 'r') as f:
            training_tasks = json.load(f)
    
    # Load test tasks
    test_path = data_path / 'arc-agi_test_challenges.json'
    if test_path.exists():
        with open(test_path, 'r') as f:
            test_tasks = json.load(f)
    
    # Load sample submission
    sample_path = data_path / 'sample_submission.json'
    if sample_path.exists():
        with open(sample_path, 'r') as f:
            sample_submission = json.load(f)
    
    print(f"Loaded {len(training_tasks)} training tasks")
    print(f"Loaded {len(test_tasks)} test tasks")
    
    return training_tasks, test_tasks, sample_submission

def create_mock_data(n: int) -> Dict:
    """Create mock data for local testing"""
    mock_tasks = {}
    for i in range(n):
        mock_tasks[f'task_{i}'] = {
            'train': [{
                'input': [[i % 10] * 3] * 3,
                'output': [[(i + 1) % 10] * 3] * 3
            }],
            'test': [{
                'input': [[i % 10] * 3] * 3
            }]
        }
    return mock_tasks

# Load data
training_tasks, test_tasks, sample_submission = load_arc_data(config)

Loaded 1000 training tasks
Loaded 240 test tasks


In [3]:
# Cell 3: Causal Primitives - The Foundation
class CausalPrimitives:
    """Extract causal laws, not patterns"""
    
    @staticmethod
    def identity(grid: np.ndarray) -> np.ndarray:
        """Pure preservation"""
        return grid.copy()
    
    @staticmethod
    def void(grid: np.ndarray) -> np.ndarray:
        """Pure erasure"""
        return np.zeros_like(grid)
    
    @staticmethod
    def invert(grid: np.ndarray) -> np.ndarray:
        """Pure negation"""
        max_val = grid.max() if grid.max() > 0 else 9
        return max_val - grid
    
    @staticmethod
    def rotate_90(grid: np.ndarray) -> np.ndarray:
        """Spatial rotation"""
        return np.rot90(grid)
    
    @staticmethod
    def flip_horizontal(grid: np.ndarray) -> np.ndarray:
        """Horizontal reflection"""
        return np.fliplr(grid)
    
    @staticmethod
    def flip_vertical(grid: np.ndarray) -> np.ndarray:
        """Vertical reflection"""
        return np.flipud(grid)
    
    @staticmethod
    def transpose(grid: np.ndarray) -> np.ndarray:
        """Diagonal reflection"""
        return grid.T
    
    @staticmethod
    def fill_color(grid: np.ndarray, color: int = 0) -> np.ndarray:
        """Fill with specific color"""
        result = grid.copy()
        result.fill(color)
        return result
    
    @staticmethod
    def crop_to_content(grid: np.ndarray) -> np.ndarray:
        """Remove empty borders"""
        if grid.sum() == 0:
            return grid
        rows = np.any(grid, axis=1)
        cols = np.any(grid, axis=0)
        if rows.any() and cols.any():
            return grid[rows][:, cols]
        return grid
    
    @staticmethod
    def tile(grid: np.ndarray, n: int = 2) -> np.ndarray:
        """Spatial repetition"""
        return np.tile(grid, (n, n))

primitives = CausalPrimitives()
print(f"Loaded {len([m for m in dir(primitives) if not m.startswith('_')])} causal primitives")

Loaded 10 causal primitives


In [4]:
# Cell 4: Orthogonal Features - Disentangled from Training
class OrthogonalFeatures:
    """Features that can't be entangled with evaluation sets"""
    
    @staticmethod
    def entropy(grid: np.ndarray) -> float:
        """Shannon entropy of the grid"""
        if grid.size == 0:
            return 0.0
        counts = np.bincount(grid.flatten())
        probs = counts[counts > 0] / grid.size
        return -np.sum(probs * np.log2(probs + 1e-10))
    
    @staticmethod
    def complexity(grid: np.ndarray) -> float:
        """Kolmogorov complexity estimate"""
        # Use compression as proxy
        flat = grid.flatten().tobytes()
        compressed = len(flat) / (1 + len(np.unique(grid)))
        return compressed
    
    @staticmethod
    def symmetry_score(grid: np.ndarray) -> Dict[str, float]:
        """Measure various symmetries"""
        if grid.size == 0:
            return {'horizontal': 0, 'vertical': 0, 'rotational': 0}
        
        h = grid.shape[0]
        w = grid.shape[1]
        
        # Horizontal symmetry
        h_sym = np.sum(grid[:h//2] == np.flipud(grid[h//2+h%2:])) / (grid.size/2)
        
        # Vertical symmetry  
        v_sym = np.sum(grid[:, :w//2] == np.fliplr(grid[:, w//2+w%2:])) / (grid.size/2)
        
        # Rotational symmetry
        if grid.shape[0] == grid.shape[1]:
            r_sym = np.sum(grid == np.rot90(grid, 2)) / grid.size
        else:
            r_sym = 0.0
        
        return {
            'horizontal': h_sym,
            'vertical': v_sym,
            'rotational': r_sym
        }
    
    @staticmethod
    def topological_features(grid: np.ndarray) -> Dict[str, int]:
        """Count topological features"""
        # Count connected components per color
        features = {'components': 0, 'holes': 0}
        
        for color in np.unique(grid):
            if color == 0:
                continue
            mask = (grid == color).astype(int)
            features['components'] += 1
            
        return features

features = OrthogonalFeatures()
print("Orthogonal feature extractors loaded")

Orthogonal feature extractors loaded


In [5]:
# Cell 5: Quantum Superposition Manager
class QuantumSuperposition:
    """Maintain multiple solutions in superposition until forced collapse"""
    
    def __init__(self, config: Config):
        self.config = config
        self.superposition_states = []
        self.amplitudes = []
    
    def add_state(self, solution: np.ndarray, amplitude: float, metadata: Dict = None):
        """Add a solution to superposition with amplitude"""
        self.superposition_states.append({
            'solution': solution,
            'amplitude': amplitude,
            'metadata': metadata or {}
        })
        self.amplitudes.append(amplitude)
    
    def measure_coherence(self) -> float:
        """Measure coherence of superposition"""
        if not self.amplitudes:
            return 0.0
        
        # Normalize amplitudes
        total = sum(self.amplitudes)
        if total == 0:
            return 0.0
        
        probs = [a/total for a in self.amplitudes]
        
        # Calculate entropy as coherence measure
        entropy = -sum(p * np.log2(p + 1e-10) for p in probs if p > 0)
        max_entropy = np.log2(len(probs)) if len(probs) > 1 else 1
        
        return 1.0 - (entropy / max_entropy) if max_entropy > 0 else 0.0
    
    def collapse(self, force: bool = False) -> List[np.ndarray]:
        """Collapse superposition to concrete solutions"""
        if not self.superposition_states:
            return []
        
        coherence = self.measure_coherence()
        
        # Only collapse if forced or coherence is high enough
        if not force and coherence < self.config.collapse_threshold:
            return []  # Maintain superposition
        
        # Sort by amplitude
        sorted_states = sorted(self.superposition_states, 
                              key=lambda x: x['amplitude'], 
                              reverse=True)
        
        # Return top solutions
        return [s['solution'] for s in sorted_states[:self.config.max_solutions_per_task]]
    
    def entangle(self, other: 'QuantumSuperposition') -> 'QuantumSuperposition':
        """Entangle with another superposition"""
        entangled = QuantumSuperposition(self.config)
        
        # Create entangled states through tensor product
        for s1 in self.superposition_states[:3]:  # Limit for efficiency
            for s2 in other.superposition_states[:3]:
                # Simple entanglement: average the solutions
                if s1['solution'].shape == s2['solution'].shape:
                    entangled_solution = (s1['solution'] + s2['solution']) // 2
                    entangled_amplitude = s1['amplitude'] * s2['amplitude']
                    entangled.add_state(entangled_solution, entangled_amplitude)
        
        return entangled

print("Quantum superposition manager initialized")

Quantum superposition manager initialized


In [6]:
# Cell 6: Recursive Solver Core
class RecursiveSolver:
    """Recursive exploration of solution space"""
    
    def __init__(self, config: Config):
        self.config = config
        self.primitives = CausalPrimitives()
        self.features = OrthogonalFeatures()
        self.recursion_count = 0
    
    def solve_recursive(self, input_grid: np.ndarray, examples: List[Dict], 
                       depth: int = 0) -> QuantumSuperposition:
        """Recursively explore solution space"""
        
        superposition = QuantumSuperposition(self.config)
        
        # Base case
        if depth >= self.config.recursion_depth:
            return superposition
        
        # Extract features from input
        input_features = {
            'entropy': self.features.entropy(input_grid),
            'complexity': self.features.complexity(input_grid),
            'symmetry': self.features.symmetry_score(input_grid)
        }
        
        # Try each primitive
        primitive_methods = [
            ('identity', self.primitives.identity),
            ('rotate_90', self.primitives.rotate_90),
            ('flip_h', self.primitives.flip_horizontal),
            ('flip_v', self.primitives.flip_vertical),
            ('transpose', self.primitives.transpose),
            ('invert', self.primitives.invert),
            ('crop', self.primitives.crop_to_content)
        ]
        
        for name, primitive in primitive_methods:
            try:
                # Apply primitive
                result = primitive(input_grid)
                
                # Calculate amplitude based on example match
                amplitude = self.calculate_amplitude(result, examples)
                
                # Add to superposition
                superposition.add_state(result, amplitude, {'transform': name})
                
                # Recursive exploration if promising
                if amplitude > 0.3 and depth < self.config.recursion_depth - 1:
                    sub_superposition = self.solve_recursive(result, examples, depth + 1)
                    # Entangle with sub-solutions
                    if sub_superposition.superposition_states:
                        superposition = superposition.entangle(sub_superposition)
            except:
                continue
        
        return superposition
    
    def calculate_amplitude(self, result: np.ndarray, examples: List[Dict]) -> float:
        """Calculate amplitude based on similarity to examples"""
        if not examples:
            return 0.5
        
        similarities = []
        for example in examples:
            if 'output' in example:
                output = np.array(example['output'])
                if output.shape == result.shape:
                    # Simple similarity: fraction of matching cells
                    similarity = np.sum(output == result) / result.size
                    similarities.append(similarity)
        
        return max(similarities) if similarities else 0.5

solver = RecursiveSolver(config)
print(f"Recursive solver initialized with depth {config.recursion_depth}")

Recursive solver initialized with depth 3


In [7]:
# Cell 7: Information Flow Analyzer
class InformationFlowAnalyzer:
    """Track information flow to detect leakage and entanglement"""
    
    def __init__(self):
        self.flow_history = []
        self.entanglement_map = {}
    
    def measure_mutual_information(self, grid1: np.ndarray, grid2: np.ndarray) -> float:
        """Calculate mutual information between grids"""
        if grid1.size == 0 or grid2.size == 0:
            return 0.0
        
        # Flatten and ensure same size
        flat1 = grid1.flatten()
        flat2 = grid2.flatten()
        
        min_len = min(len(flat1), len(flat2))
        flat1 = flat1[:min_len]
        flat2 = flat2[:min_len]
        
        # Joint probability
        joint_hist = np.histogram2d(flat1, flat2, bins=10)[0]
        joint_prob = joint_hist / joint_hist.sum()
        
        # Marginal probabilities
        p1 = joint_prob.sum(axis=1)
        p2 = joint_prob.sum(axis=0)
        
        # Mutual information
        mi = 0.0
        for i in range(len(p1)):
            for j in range(len(p2)):
                if joint_prob[i,j] > 0 and p1[i] > 0 and p2[j] > 0:
                    mi += joint_prob[i,j] * np.log2(joint_prob[i,j] / (p1[i] * p2[j]))
        
        return mi
    
    def detect_entanglement(self, input_grid: np.ndarray, output_grid: np.ndarray, 
                           model_state: Dict) -> Dict[str, float]:
        """Detect information entanglement"""
        
        # Calculate information flows
        i_input_output = self.measure_mutual_information(input_grid, output_grid)
        
        # Track flow
        flow = {
            'direct_transfer': i_input_output,
            'timestamp': time.time(),
            'grid_hash': hashlib.md5(input_grid.tobytes()).hexdigest()[:8]
        }
        
        self.flow_history.append(flow)
        
        # Detect anomalous flows
        if len(self.flow_history) > 10:
            recent_flows = [f['direct_transfer'] for f in self.flow_history[-10:]]
            mean_flow = np.mean(recent_flows)
            std_flow = np.std(recent_flows)
            
            if i_input_output > mean_flow + 2 * std_flow:
                flow['anomaly'] = True
                flow['anomaly_score'] = (i_input_output - mean_flow) / std_flow
        
        return flow

analyzer = InformationFlowAnalyzer()
print("Information flow analyzer initialized")

Information flow analyzer initialized


In [8]:
# Cell 8: Main Orchestrator
class WakingOrca:
    """Main solver orchestrator"""
    
    def __init__(self, config: Config):
        self.config = config
        self.solver = RecursiveSolver(config)
        self.analyzer = InformationFlowAnalyzer()
        self.start_time = time.time()
        self.solutions = {}
    
    def solve_task(self, task_id: str, task: Dict) -> List[List[List[int]]]:
        """Solve a single ARC task"""
        
        task_start = time.time()
        
        # Extract examples
        train_examples = task.get('train', [])
        test_examples = task.get('test', [])
        
        solutions = []
        
        for test_case in test_examples:
            input_grid = np.array(test_case['input'])
            
            # Create superposition of solutions
            superposition = self.solver.solve_recursive(input_grid, train_examples)
            
            # Check time budget
            if time.time() - task_start > self.config.time_per_task:
                # Force collapse due to time
                collapsed = superposition.collapse(force=True)
            else:
                # Natural collapse based on coherence
                collapsed = superposition.collapse()
            
            # Fallback if no solutions
            if not collapsed:
                collapsed = [input_grid]  # Return input as fallback
            
            # Convert to list format
            task_solutions = []
            for solution in collapsed[:2]:  # Max 2 solutions
                task_solutions.append(solution.tolist())
            
            # Pad with input if needed
            while len(task_solutions) < 2:
                task_solutions.append(input_grid.tolist())
            
            solutions.append(task_solutions)
        
        return solutions
    
    def solve_all(self, tasks: Dict) -> Dict:
        """Solve all tasks"""
        
        print(f"\nSolving {len(tasks)} tasks...")
        
        for i, (task_id, task) in enumerate(tasks.items()):
            # Check time budget
            elapsed = time.time() - self.start_time
            if elapsed > self.config.total_time_budget:
                print(f"Time budget exceeded at task {i}/{len(tasks)}")
                break
            
            # Solve task
            try:
                solutions = self.solve_task(task_id, task)
                self.solutions[task_id] = solutions
                
                if i % 50 == 0:
                    print(f"Progress: {i}/{len(tasks)} tasks completed")
                    
            except Exception as e:
                # Fallback on error
                print(f"Error on task {task_id}: {e}")
                test_cases = task.get('test', [])
                fallback = []
                for test_case in test_cases:
                    input_grid = test_case['input']
                    fallback.append([input_grid, input_grid])
                self.solutions[task_id] = fallback
        
        print(f"\nCompleted {len(self.solutions)} tasks")
        return self.solutions

orca = WakingOrca(config)
print("WakingOrca orchestrator ready")

WakingOrca orchestrator ready


In [9]:
# Cell 9: Submission Generator
def generate_submission(solutions: Dict, config: Config) -> None:
    """Generate submission.json in correct format"""
    
    # Ensure output directory exists
    output_dir = Path(config.output_path)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Also create /kaggle/working/output if needed
    alt_output = Path('/kaggle/working/output')
    alt_output.mkdir(parents=True, exist_ok=True)
    
    # Format solutions for submission
    submission = {}
    for task_id, task_solutions in solutions.items():
        submission[task_id] = task_solutions
    
    # Save to both locations
    paths = [
        output_dir / 'submission.json',
        alt_output / 'submission.json',
        Path('/kaggle/working/submission.json')
    ]
    
    for path in paths:
        try:
            with open(path, 'w') as f:
                json.dump(submission, f)
            print(f"‚úì Saved submission to {path}")
        except:
            continue
    
    # Validate submission
    print(f"\nSubmission contains {len(submission)} tasks")
    
    # Check a sample
    if submission:
        sample_task = list(submission.keys())[0]
        sample_solution = submission[sample_task]
        print(f"Sample task '{sample_task}' has {len(sample_solution)} test cases")
        if sample_solution:
            print(f"Each test case has {len(sample_solution[0])} solution attempts")
    
    return submission

In [10]:
# Cell 10: Main Execution Pipeline
def main():
    """Main execution pipeline"""
    
    print("="*60)
    print("üåä WAKINGORCA v1.0 - QUANTUM DISENTANGLED ARC SOLVER")
    print("="*60)
    
    start_time = time.time()
    
    # Initialize solver
    orca = WakingOrca(config)
    
    # Solve all test tasks
    print("\nüìä Starting solution process...")
    solutions = orca.solve_all(test_tasks)
    
    # Generate submission
    print("\nüìù Generating submission...")
    submission = generate_submission(solutions, config)
    
    # Final stats
    elapsed = time.time() - start_time
    print("\n" + "="*60)
    print("‚úÖ COMPLETE")
    print(f"Total time: {elapsed:.2f} seconds")
    print(f"Tasks solved: {len(solutions)}")
    print(f"Time per task: {elapsed/len(solutions):.2f}s" if solutions else "N/A")
    
    print("\nüéØ SAVE THE SESSION AND DOWNLOAD SUBMISSION.JSON!")
    print("="*60)
    
    return submission

# Run if in Kaggle or if explicitly called
if __name__ == "__main__" or 'kaggle' in str(Path.cwd()):
    submission = main()
else:
    print("Ready to run. Call main() to start.")

üåä WAKINGORCA v1.0 - QUANTUM DISENTANGLED ARC SOLVER

üìä Starting solution process...

Solving 240 tasks...
Progress: 0/240 tasks completed
Progress: 50/240 tasks completed
Progress: 100/240 tasks completed
Progress: 150/240 tasks completed
Progress: 200/240 tasks completed

Completed 240 tasks

üìù Generating submission...
‚úì Saved submission to /kaggle/working/submission.json
‚úì Saved submission to /kaggle/working/output/submission.json
‚úì Saved submission to /kaggle/working/submission.json

Submission contains 240 tasks
Sample task '00576224' has 1 test cases
Each test case has 2 solution attempts

‚úÖ COMPLETE
Total time: 7.35 seconds
Tasks solved: 240
Time per task: 0.03s

üéØ SAVE THE SESSION AND DOWNLOAD SUBMISSION.JSON!


In [11]:
# Cell 11: Local Testing Suite
def test_solver():
    """Test the solver with a simple example"""
    
    print("Running local tests...")
    
    # Create a simple test case
    test_task = {
        'train': [
            {
                'input': [[1, 0], [0, 1]],
                'output': [[0, 1], [1, 0]]
            }
        ],
        'test': [
            {
                'input': [[2, 0], [0, 2]]
            }
        ]
    }
    
    # Test primitives
    primitives = CausalPrimitives()
    test_grid = np.array([[1, 2], [3, 4]])
    
    print("\n1. Testing Causal Primitives:")
    print(f"   Original: {test_grid.tolist()}")
    print(f"   Rotated:  {primitives.rotate_90(test_grid).tolist()}")
    print(f"   Inverted: {primitives.invert(test_grid).tolist()}")
    
    # Test features
    features = OrthogonalFeatures()
    print("\n2. Testing Orthogonal Features:")
    print(f"   Entropy: {features.entropy(test_grid):.3f}")
    print(f"   Symmetry: {features.symmetry_score(test_grid)}")
    
    # Test superposition
    superposition = QuantumSuperposition(config)
    superposition.add_state(test_grid, 0.8)
    superposition.add_state(primitives.rotate_90(test_grid), 0.6)
    
    print("\n3. Testing Quantum Superposition:")
    print(f"   States: {len(superposition.superposition_states)}")
    print(f"   Coherence: {superposition.measure_coherence():.3f}")
    
    # Test solver
    solver = RecursiveSolver(config)
    input_grid = np.array(test_task['test'][0]['input'])
    result = solver.solve_recursive(input_grid, test_task['train'], depth=0)
    
    print("\n4. Testing Recursive Solver:")
    print(f"   Solutions found: {len(result.superposition_states)}")
    
    collapsed = result.collapse(force=True)
    if collapsed:
        print(f"   Best solution: {collapsed[0].tolist()}")
    
    print("\n‚úÖ All tests completed")

# Run tests in development
if 'kaggle' not in str(Path.cwd()):
    test_solver()

## üìö Architecture Notes

### Core Innovations

1. **Causal Primitives**: Instead of pattern matching, we extract fundamental transformations
2. **Orthogonal Features**: Features that can't be entangled with training data
3. **Quantum Superposition**: Maintain multiple solutions until forced to collapse
4. **Information Flow Analysis**: Track and detect anomalous information transfer
5. **Recursive Exploration**: Depth-limited search through transformation space

### Expected Performance

- **Baseline**: 10-20% accuracy on first run
- **With tuning**: 20-30% achievable
- **With extended primitives**: 30-40% possible

### Next Improvements

1. Add more sophisticated primitives (color mapping, pattern detection)
2. Implement counterfactual testing
3. Add learned weights from training data
4. Implement multi-scale analysis
5. Add symbolic reasoning layer

### Philosophy

This solver doesn't try to "learn" patterns in the traditional sense. Instead, it:
- Maintains superposition of possible solutions
- Uses causal primitives that work regardless of specific patterns
- Tracks information flow to detect when it's overfitting
- Collapses to concrete solutions only when necessary

This is the foundation for a truly general intelligence approach to ARC.