In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import json
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, TextStreamer
import torch
from accelerate import infer_auto_device_map
import gc
from IPython.display import Markdown, display

#Boolean Face
from dataclasses import dataclass
from enum import Enum
import logging
from typing import List, Dict, Tuple, Optional, Set
from collections import defaultdict
import itertools

# Set up logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

from src.utils.utils import load_and_log_first_task


# Option 1 (Recommended for T4 x2): Enable tokenizer parallelism
# T4s have enough memory and processing power to benefit from parallel tokenization
os.environ["TOKENIZERS_PARALLELISM"] = "true"

# Enable CUDA memory optimizations
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

# Clear GPU memory before loading model
gc.collect()
torch.cuda.empty_cache()


ModuleNotFoundError: No module named 'transformers'

### Transformers

In [None]:
# %%capture
# %pip install -U transformers accelerate

### Load Data

In [None]:
# practice_path = '/kaggle/input/practice/practice_training_challenge.json'
kaggle_path = 'kaggle/input/arc-prize-2024/arc-agi_training_challenges.json'

# Load challenges and log the first task
challenge_tasks = load_and_log_first_task(kaggle_path)

if challenge_tasks:
    # Get the first task ID and data
        # Load all task IDs into an array
    task_ids = list(challenge_tasks.keys())
    first_task_id = next(iter(challenge_tasks))
    first_task_data = challenge_tasks[first_task_id]

    # Now you can work with the first task
    print(f"Working with task ID: {first_task_id}")

    # If you want to process all tasks, you can still do so:
    # for task_id, task_data in challenge_tasks.items():
    #     result = solve_arc_task(task_data)
    #     # Process or store the result as needed
else:
    print("Failed to load challenge tasks.")

### Setup up llama-3.2

In [None]:
base_model = "/kaggle/input/llama-3.2/transformers/3b-instruct/1"


# Load tokenizer with caching
tokenizer = AutoTokenizer.from_pretrained(
    base_model,
    use_fast=True,  # Fast tokenizer is crucial for T4 performance
    cache_dir="./cache",
    padding_side="left",
    truncation=True,
    use_threading=True  # Enable threading for parallel processing
)

# Configure model for dual T4s
model = AutoModelForCausalLM.from_pretrained(
    base_model,
    torch_dtype=torch.float16,  # FP16 for optimal T4 performance
    device_map="auto",  # Let accelerate handle dual GPU distribution
    low_cpu_mem_usage=True,
    use_cache=True,
    max_memory={
        0: "11GiB",  # Reserve some memory for CUDA overhead
        1: "11GiB",  # T4s have 16GB each, leaving buffer
        "cpu": "24GiB"  # Generous CPU memory for caching
    },
    offload_folder="offload",
    trust_remote_code=True
)

In [None]:
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id
if model.config.pad_token_id is None:
    model.config.pad_token_id = model.config.eos_token_id

In [None]:

pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    torch_dtype=torch.float16,
    device_map="auto",
    batch_size=2,  # Optimal for dual T4s with this model size
    max_length=2048
)

In [None]:

from IPython.display import Markdown, display

messages = [
    {
        "role": "system",
        "content": "You are an https://arcprize.org/guide expert",
    },
    {
        "role": "user",
        "content": "Who is Francois Chollet?",
    },
]

prompt = tokenizer.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)

outputs = pipe(prompt,truncation=True, do_sample=True)

display(
    Markdown(
            outputs[0]["generated_text"].split(
                "<|start_header_id|>assistant<|end_header_id|>"
            )[1]
        )
    )

### Boolean Face

In [None]:
def boolean_solver(input_data):
    # Convert JSON input to numpy array for easier processing
    grids = [np.array(grid) for grid in input_data]
    
    # We assume the operation is "logical OR" across all grid inputs
    output_grid = np.bitwise_or.reduce(grids)
    
    return output_grid.tolist()


class BooleanOperator(Enum):
    AND = "and"
    OR = "or"
    XOR = "xor"
    NAND = "nand"
    NOR = "nor"
    NOT = "not"
    XNOR = "xnor"
    
@dataclass
class TruthTableEntry:
    input_states: Tuple[np.ndarray, ...]
    output_state: np.ndarray
    operator: BooleanOperator
    score: float
    description: str = ""

class TruthTableAnalyzer:
    """Analyzes 2x2 matrix patterns using truth tables and cellular automata principles"""
    
    def __init__(self, logging_level=logging.INFO):
        self.setup_logging(logging_level)
        self.truth_table_cache = {}
        self.score_history = defaultdict(list)
        self.best_rules = {}
        
    def setup_logging(self, level):
        """Configure detailed logging for analysis"""
        logging.basicConfig(
            level=level,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('boolean_analysis.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('BooleanAnalyzer')
        
    def analyze_2x2_pattern(self, input_grid: np.ndarray, output_grid: np.ndarray) -> TruthTableEntry:
        """Analyze a 2x2 pattern and generate truth table"""
        self.logger.info(f"\nAnalyzing 2x2 pattern:\nInput:\n{input_grid}\nOutput:\n{output_grid}")
        
        # Convert to boolean
        input_bool = input_grid.astype(bool)
        output_bool = output_grid.astype(bool)
        
        # Generate cache key
        cache_key = self._generate_cache_key(input_bool, output_bool)
        
        if cache_key in self.truth_table_cache:
            self.logger.info("Found pattern in cache")
            return self.truth_table_cache[cache_key]
        
        # Try all boolean operators and score results
        best_entry = self._find_best_operator(input_bool, output_bool)
        
        # Cache result
        self.truth_table_cache[cache_key] = best_entry
        
        return best_entry
    
    def _generate_cache_key(self, input_bool: np.ndarray, output_bool: np.ndarray) -> str:
        """Generate unique key for caching"""
        return f"{input_bool.tobytes()}-{output_bool.tobytes()}"
    
    def _find_best_operator(self, input_bool: np.ndarray, output_bool: np.ndarray) -> TruthTableEntry:
        """Try different boolean operators and find best match"""
        best_score = 0.0
        best_entry = None
        
        for operator in BooleanOperator:
            result = self._apply_operator(input_bool, operator)
            score = self._calculate_match_score(result, output_bool)
            
            self.logger.debug(f"Operator {operator.value}:")
            self.logger.debug(f"Result:\n{result}")
            self.logger.debug(f"Score: {score}")
            
            if score > best_score:
                best_score = score
                best_entry = TruthTableEntry(
                    input_states=(input_bool,),
                    output_state=result,
                    operator=operator,
                    score=score
                )
                
        self.logger.info(f"Best operator: {best_entry.operator.value} with score {best_entry.score}")
        return best_entry
    
    def _apply_operator(self, grid: np.ndarray, operator: BooleanOperator) -> np.ndarray:
        """Apply boolean operator to grid"""
        if operator == BooleanOperator.NOT:
            return ~grid
        elif operator == BooleanOperator.AND:
            return grid & np.roll(grid, 1, axis=0)
        elif operator == BooleanOperator.OR:
            return grid | np.roll(grid, 1, axis=0)
        elif operator == BooleanOperator.XOR:
            return grid ^ np.roll(grid, 1, axis=0)
        elif operator == BooleanOperator.NAND:
            return ~(grid & np.roll(grid, 1, axis=0))
        elif operator == BooleanOperator.NOR:
            return ~(grid | np.roll(grid, 1, axis=0))
        elif operator == BooleanOperator.XNOR:
            return ~(grid ^ np.roll(grid, 1, axis=0))
            
    def _calculate_match_score(self, result: np.ndarray, target: np.ndarray) -> float:
        """Calculate match score between result and target"""
        return np.mean(result == target)

class BooleanAlgebraFace:
    """Main Boolean Algebra face implementation"""
    
    def __init__(self, llm_pipeline=None):
        self.analyzer = TruthTableAnalyzer()
        self.llm_pipeline = llm_pipeline
        self.learning_rate = 0.1
        self.min_confidence = 0.6
        
    def analyze_task(self, task: Dict) -> Tuple[List[List[int]], float]:
        """Analyze task and return prediction with confidence score"""
        input_grids = [np.array(pair['input']) for pair in task['train']]
        output_grids = [np.array(pair['output']) for pair in task['train']]
        
        # Analyze patterns starting with 2x2 windows
        patterns = self._extract_2x2_patterns(input_grids[0], output_grids[0])
        
        # Get truth table analysis for each pattern
        truth_tables = []
        for in_pattern, out_pattern in patterns:
            entry = self.analyzer.analyze_2x2_pattern(in_pattern, out_pattern)
            truth_tables.append(entry)
            
        # Generate description using LLM if available
        if self.llm_pipeline and any(entry.score > self.min_confidence for entry in truth_tables):
            best_entry = max(truth_tables, key=lambda x: x.score)
            description = self._generate_llm_description(best_entry)
            best_entry.description = description
            
        # Apply learned patterns to test input
        test_input = np.array(task['test']['input'])
        prediction, confidence = self._apply_patterns(test_input, truth_tables)
        
        return prediction.tolist(), confidence
    
    def _extract_2x2_patterns(self, input_grid: np.ndarray, output_grid: np.ndarray) -> List[Tuple[np.ndarray, np.ndarray]]:
        """Extract all 2x2 patterns from input/output grids"""
        patterns = []
        rows, cols = input_grid.shape
        
        for i in range(rows-1):
            for j in range(cols-1):
                in_pattern = input_grid[i:i+2, j:j+2]
                out_pattern = output_grid[i:i+2, j:j+2]
                patterns.append((in_pattern, out_pattern))
                
        return patterns
    
    def _generate_llm_description(self, entry: TruthTableEntry) -> str:
        """Generate natural language description of boolean pattern"""
        prompt = f"""
        Describe this boolean pattern transformation:
        Input matrix:
        {entry.input_states[0]}
        
        Operation applied: {entry.operator.value}
        
        Output matrix:
        {entry.output_state}
        
        Score: {entry.score}
        
        Describe the pattern in natural language:
        """
        
        response = self.llm_pipeline(prompt, max_length=100)[0]['generated_text']
        return response.strip()
    
    def _apply_patterns(self, test_input: np.ndarray, truth_tables: List[TruthTableEntry]) -> Tuple[np.ndarray, float]:
        """Apply learned patterns to test input"""
        # Start with highest scoring patterns
        truth_tables.sort(key=lambda x: x.score, reverse=True)
        
        # Initialize output grid
        output = np.zeros_like(test_input)
        total_confidence = 0.0
        
        # Apply each pattern where confidence is high enough
        for entry in truth_tables:
            if entry.score > self.min_confidence:
                transformed = self._apply_operator(test_input, entry.operator)
                output = np.where(entry.score > total_confidence, transformed, output)
                total_confidence = max(total_confidence, entry.score)
                
        return output, total_confidence

    def update_learning(self, task_id: str, prediction: List[List[int]], actual: List[List[int]], score: float):
        """Update learning based on feedback"""
        self.analyzer.score_history[task_id].append(score)
        
        # Adjust learning rate based on historical performance
        if len(self.analyzer.score_history[task_id]) > 1:
            prev_score = self.analyzer.score_history[task_id][-2]
            if score > prev_score:
                self.learning_rate *= 1.1
            else:
                self.learning_rate *= 0.9
                
        self.analyzer.logger.info(f"Task {task_id} - Score: {score}, Learning rate: {self.learning_rate}")

### Usage

In [None]:
class ARCProcessor:
    def __init__(self, boolean_face=None, llm_pipeline=None):
        self.boolean_face = boolean_face or BooleanAlgebraFace(llm_pipeline)
        
    def process_tasks(self, tasks: Dict) -> Dict:
        results = {}
        for task_id, task_data in tasks.items():
            print(f"Processing task {task_id}")
            predictions = self.process_single_task(task_data)
            results[task_id] = predictions
        return results
    
    def process_single_task(self, task_data: Dict) -> List[Dict]:
        # Handle both single and multiple test inputs
        test_inputs = task_data.get('test', {})
        if isinstance(test_inputs, dict):
            test_inputs = [test_inputs]
            
        predictions = []
        for test_input in test_inputs:
            task = {
                'train': task_data['train'],
                'test': test_input
            }
            pred_grid, confidence = self.boolean_face.analyze_task(task)
            
            # Create two slightly different attempts
            attempt_1 = pred_grid
            attempt_2 = self._create_alternative_attempt(pred_grid)
            
            prediction = {
                'attempt_1': attempt_1,
                'attempt_2': attempt_2
            }
            predictions.append(prediction)
            
        return predictions
    
    def _create_alternative_attempt(self, pred_grid: List[List[int]]) -> List[List[int]]:
        """Create a slightly modified version of the prediction for the second attempt"""
        grid = np.array(pred_grid)
        if grid.size > 0:
            # Randomly modify one cell for variety
            i, j = np.random.randint(0, grid.shape[0]), np.random.randint(0, grid.shape[1])
            grid[i, j] = 1 - grid[i, j]  # Flip 0 to 1 or 1 to 0
        return grid.tolist()
    
    def save_submission(self, results: Dict, output_path: str = 'submission.json'):
        with open(output_path, 'w') as f:
            json.dump(results, f, indent=2)
            
def run_arc_processor(test_file_path: str, llm_pipeline=None):
    # Load tasks
    with open(test_file_path, 'r') as f:
        tasks = json.load(f)
    
    # Initialize components
    print("Initializing Boolean Algebra face...")
    boolean_face = BooleanAlgebraFace(llm_pipeline)
    
    # Create processor
    print("Creating processor...")
    processor = ARCProcessor(boolean_face, llm_pipeline)
    
    # Process tasks
    print("Processing tasks...")
    results = processor.process_tasks(tasks)
    
    # Save submission
    processor.save_submission(results)
    return results