# AIMO3 Complete Submission - All Phases (1-4)
## AI Mathematical Olympiad Progress Prize 3

**Features:**
- Phase 1-3: LLM reasoning with chain-of-thought
- Phase 4: Symbolic verification + fallback validation + edge case handling
- Competition API integration (aimo)
- Comprehensive metrics tracking

**Answer Format:** Integer (0-99,999)
**Evaluation:** Penalized accuracy (2 runs per problem)

## 1. Install Dependencies

In [None]:
# Install required packages
!pip install --quiet sympy torch transformers peft pandas tqdm

## 2. Import Libraries

In [None]:
import os
import re
import json
import logging
from typing import Dict, List, Any, Optional, Union, Tuple
from datetime import datetime

import torch
import pandas as pd
import sympy as sp
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print(f"PyTorch: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

## 3. Phase 4: Computation Components (Inline)

These classes provide symbolic verification, fallback validation, and metrics tracking.

In [None]:
class SymbolicCompute:
    """SymPy-based symbolic computation and verification."""
    
    @staticmethod
    def evaluate_expression(expr_str: str) -> Optional[Union[int, float]]:
        """Evaluate a mathematical expression string using SymPy."""
        try:
            expr = sp.sympify(expr_str)
            result = expr.evalf()
            return int(result) if result == int(result) else float(result)
        except Exception as e:
            logger.debug(f"Failed to evaluate expression '{expr_str}': {str(e)}")
            return None
    
    @staticmethod
    def parse_llm_output_for_expressions(llm_text: str) -> Dict[str, Any]:
        """Parse LLM output to extract mathematical expressions."""
        result = {
            "expressions": [],
            "values": [],
            "final_value": None
        }
        
        try:
            # Extract expressions
            expr_pattern = r"(?:=|equals|is)\s*(\d+(?:\.\d+)?|[\w\s\+\-\*/\(\)\.]+)"
            expr_matches = re.findall(expr_pattern, llm_text, re.IGNORECASE)
            
            # Extract numeric values
            num_pattern = r"\b(\d+(?:\.\d+)?)\b"
            num_matches = re.findall(num_pattern, llm_text)
            
            result["expressions"] = expr_matches[:5]
            result["values"] = [float(n) if '.' in n else int(n) for n in num_matches[:10]]
            
            if num_matches:
                result["final_value"] = float(num_matches[-1]) if '.' in num_matches[-1] else int(num_matches[-1])
                
        except Exception as e:
            logger.debug(f"Failed to parse LLM output: {str(e)}")
        
        return result
    
    @staticmethod
    def verify_symbolic_result(
        llm_answer: int,
        llm_output: str,
        tolerance: float = 0.01
    ) -> Tuple[bool, float]:
        """Verify LLM answer by symbolic computation. Returns (is_valid, confidence)."""
        try:
            parsed = SymbolicCompute.parse_llm_output_for_expressions(llm_output)
            
            # Try to evaluate extracted expressions
            if parsed["expressions"]:
                for expr_str in parsed["expressions"]:
                    try:
                        result = SymbolicCompute.evaluate_expression(expr_str)
                        if result is not None:
                            if isinstance(result, float):
                                diff_percent = abs(result - llm_answer) / max(abs(llm_answer), 1)
                                if diff_percent <= tolerance:
                                    return True, 1.0 - diff_percent
                            else:
                                if int(result) == llm_answer:
                                    return True, 1.0
                    except:
                        continue
            
            # Check final value
            if parsed["final_value"] is not None and parsed["final_value"] == llm_answer:
                return True, 0.8
                    
        except Exception as e:
            logger.debug(f"Verification failed: {str(e)}")
        
        return False, 0.5

print("‚úÖ SymbolicCompute class loaded")

In [None]:
class AnswerValidator:
    """Validates and enforces answer format constraints."""
    
    AIMO_MIN = 0
    AIMO_MAX = 99999
    
    @staticmethod
    def validate_integer(value: Any) -> Optional[int]:
        """Validate and convert value to valid AIMO integer."""
        try:
            int_value = int(float(str(value).strip()))
            
            if int_value < AnswerValidator.AIMO_MIN:
                return AnswerValidator.AIMO_MIN
            
            if int_value > AnswerValidator.AIMO_MAX:
                return AnswerValidator.AIMO_MAX
            
            return int_value
        except (ValueError, TypeError) as e:
            logger.debug(f"Failed to validate answer: {str(e)}")
            return None
    
    @staticmethod
    def extract_and_validate_answer(text: str) -> Optional[int]:
        """Extract numeric answer from text and validate it."""
        patterns = [
            r"(?:answer|result|final answer)\s*:?\s*(\d+)",
            r"(?:the answer is|equals)\s*(\d+)",
            r"(\d{1,5})\s*(?:is the answer|is correct)"
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                try:
                    candidate = int(match.group(1))
                    validated = AnswerValidator.validate_integer(candidate)
                    if validated is not None:
                        return validated
                except ValueError:
                    continue
        
        # Fallback: find all numbers and validate the last one
        numbers = re.findall(r"\d+", text)
        if numbers:
            return AnswerValidator.validate_integer(numbers[-1])
        
        return None
    
    @staticmethod
    def validate_with_fallback_strategies(
        llm_answer: Optional[int],
        llm_text: str
    ) -> Dict[str, Any]:
        """Validate answer with multiple fallback strategies."""
        result = {
            "final_answer": 0,
            "confidence": 0.0,
            "strategy_used": "default_fallback",
            "fallback_applied": False
        }
        
        try:
            # Strategy 1: Use primary answer if valid
            if llm_answer is not None:
                validated = AnswerValidator.validate_integer(llm_answer)
                if validated is not None:
                    result["final_answer"] = validated
                    result["confidence"] = 0.9
                    result["strategy_used"] = "primary_llm_answer"
                    return result
            
            # Strategy 2: Try symbolic verification
            is_valid, confidence = SymbolicCompute.verify_symbolic_result(
                llm_answer if llm_answer is not None else 0,
                llm_text
            )
            
            if is_valid and llm_answer is not None:
                validated = AnswerValidator.validate_integer(llm_answer)
                if validated is not None:
                    result["final_answer"] = validated
                    result["confidence"] = confidence
                    result["strategy_used"] = "symbolic_verification"
                    result["fallback_applied"] = True
                    return result
            
            # Strategy 3: Re-extract from text
            extracted = AnswerValidator.extract_and_validate_answer(llm_text)
            if extracted is not None:
                result["final_answer"] = extracted
                result["confidence"] = 0.75
                result["strategy_used"] = "text_reextraction"
                result["fallback_applied"] = True
                return result
            
            # Strategy 4: Default fallback
            result["fallback_applied"] = True
            
        except Exception as e:
            logger.debug(f"Fallback validation failed: {str(e)}")
        
        return result
    
    @staticmethod
    def handle_edge_cases(answer: int, problem_context: str = "") -> Tuple[int, str]:
        """Handle edge cases in answer validation."""
        edge_case_note = ""
        
        try:
            # Negative answers
            if answer < 0:
                edge_case_note = f"Negative answer {answer} converted to 0"
                return 0, edge_case_note
            
            # Very large answers
            if answer > AnswerValidator.AIMO_MAX * 10:
                if "mod" in problem_context.lower():
                    modulus = 1000
                    answer = answer % modulus
                    edge_case_note = f"Large answer modded to {answer}"
                    return answer, edge_case_note
                else:
                    answer = AnswerValidator.AIMO_MAX
                    edge_case_note = f"Very large answer capped at {AnswerValidator.AIMO_MAX}"
                    return answer, edge_case_note
            
            validated = AnswerValidator.validate_integer(answer)
            return validated if validated is not None else 0, edge_case_note
            
        except Exception as e:
            logger.debug(f"Edge case handling failed: {str(e)}")
            return 0, f"Edge case error: {str(e)}"

print("‚úÖ AnswerValidator class loaded")

In [None]:
class ExecutionMetrics:
    """Track execution metrics for the pipeline."""
    
    def __init__(self):
        self.metrics = {
            "total_processed": 0,
            "successful": 0,
            "failed": 0,
            "fallback_used": 0,
            "verified": 0,
            "average_confidence": 0.0
        }
    
    def record_result(
        self,
        success: bool,
        fallback_used: bool = False,
        verified: bool = False,
        confidence: float = 0.0
    ) -> None:
        """Record a single result."""
        self.metrics["total_processed"] += 1
        
        if success:
            self.metrics["successful"] += 1
        else:
            self.metrics["failed"] += 1
        
        if fallback_used:
            self.metrics["fallback_used"] += 1
        
        if verified:
            self.metrics["verified"] += 1
        
        # Update average confidence
        if self.metrics["successful"] > 0:
            self.metrics["average_confidence"] = (
                (self.metrics["average_confidence"] * (self.metrics["successful"] - 1) + confidence) /
                self.metrics["successful"]
            )
    
    def get_summary(self) -> Dict[str, Any]:
        """Get execution summary."""
        total = self.metrics["total_processed"]
        
        return {
            "total_processed": total,
            "successful": self.metrics["successful"],
            "failed": self.metrics["failed"],
            "success_rate": self.metrics["successful"] / total if total > 0 else 0.0,
            "fallback_used_count": self.metrics["fallback_used"],
            "fallback_rate": self.metrics["fallback_used"] / total if total > 0 else 0.0,
            "verified_count": self.metrics["verified"],
            "verification_rate": self.metrics["verified"] / total if total > 0 else 0.0,
            "average_confidence": self.metrics["average_confidence"]
        }

print("‚úÖ ExecutionMetrics class loaded")

## 4. Load LLM Model

In [None]:
# Model configuration - Change this to use stronger models on Kaggle
MODEL_NAME = "gpt2"  # Fast baseline for testing
# For better performance, use: "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B" or similar

MAX_TOKENS = 512
TEMPERATURE = 0.7

print(f"Loading model: {MODEL_NAME}")
print(f"Using device: {DEVICE}")

In [None]:
try:
    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32
    )
    
    if DEVICE == "cuda":
        model = model.to(DEVICE)
    
    model.eval()
    
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    print("‚úÖ Model loaded successfully!")
    MODEL_AVAILABLE = True
except Exception as e:
    print(f"‚ö†Ô∏è Model loading failed: {e}")
    print("Continuing with preprocessing-only mode...")
    MODEL_AVAILABLE = False

## 5. Define Helper Functions

In [None]:
def latex_to_text(latex_expr: str) -> str:
    """Convert LaTeX expressions to plain text."""
    text = re.sub(r"\\\\", "", latex_expr)
    text = re.sub(r"\$\$|\$", "", text)
    text = re.sub(r"\\left|\\right", "", text)
    text = re.sub(r"\\begin\{.*?\}|\\end\{.*?\}", "", text)
    text = re.sub(r"\\text\{", "", text)
    text = re.sub(r"\}", "", text)
    text = re.sub(r"\\frac", "frac", text)
    text = re.sub(r"\\sqrt", "sqrt", text)
    text = re.sub(r"\\[a-z]+", "", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()

def prepare_problem(input_data: str) -> str:
    """Prepare problem for LLM input."""
    return latex_to_text(str(input_data))

print("‚úÖ Preprocessing functions defined")

In [None]:
def create_prompt(problem_text: str) -> str:
    """Create chain-of-thought prompt for LLM."""
    return f"""You are a mathematical expert solving Olympiad-level problems.
Solve the following problem step-by-step:

Problem: {problem_text}

Solution:
Let me work through this carefully.

Step 1: """

def llm_solve(problem_text: str) -> Dict[str, Any]:
    """Solve problem using LLM with chain-of-thought reasoning."""
    if not MODEL_AVAILABLE:
        return {"problem": problem_text, "reasoning": None, "error": "Model not available"}
    
    try:
        prompt = create_prompt(problem_text)
        inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)
        
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=MAX_TOKENS,
                temperature=TEMPERATURE,
                top_p=0.9,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )
        
        raw_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Extract reasoning
        if "Solution:" in raw_output:
            reasoning = raw_output.split("Solution:")[-1]
        else:
            reasoning = raw_output
        
        return {
            "problem": problem_text,
            "reasoning": reasoning.strip(),
            "raw_output": raw_output
        }
    except Exception as e:
        return {"problem": problem_text, "error": str(e), "reasoning": None}

def extract_numeric_answer(text: str) -> Optional[int]:
    """Extract numeric answer from LLM output."""
    if text is None:
        return None
    
    patterns = [
        r"(?:answer|result|final answer)\s*:?\s*(\d+)",
        r"(?:the answer is|equals)\s*(\d+)",
        r"(\d{1,5})\s*(?:is the answer|is correct)"
    ]
    
    for pattern in patterns:
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            try:
                return int(match.group(1))
            except ValueError:
                continue
    
    # Fallback: find all numbers and return the last one
    numbers = re.findall(r"\d+", text)
    if numbers:
        return int(numbers[-1])
    
    return None

print("‚úÖ LLM functions defined")

## 6. Phase 4 Integrated Solver Function

In [None]:
def solve_with_phase4_verification(
    problem_text: str,
    problem_id: str,
    metrics_tracker: ExecutionMetrics
) -> Dict[str, Any]:
    """
    Solve problem with full Phase 4 verification workflow.
    
    Workflow:
    1. Preprocess problem
    2. LLM reasoning
    3. Extract answer
    4. Phase 4: Symbolic verification
    5. Phase 4: Fallback validation
    6. Phase 4: Edge case handling
    7. Metrics tracking
    """
    result = {
        "problem_id": problem_id,
        "problem_text": problem_text,
        "final_answer": 0,
        "confidence": 0.0,
        "strategy_used": "default",
        "verified": False,
        "error": None
    }
    
    try:
        # Step 1: Preprocess
        prepared_problem = prepare_problem(problem_text)
        
        # Step 2-3: LLM Reasoning + Answer Extraction
        llm_result = llm_solve(prepared_problem)
        
        if "error" in llm_result:
            result["error"] = llm_result["error"]
            metrics_tracker.record_result(success=False)
            return result
        
        llm_answer = extract_numeric_answer(llm_result["reasoning"])
        
        # Step 4: Symbolic Verification
        is_verified, verification_confidence = SymbolicCompute.verify_symbolic_result(
            llm_answer if llm_answer is not None else 0,
            llm_result["reasoning"]
        )
        
        # Step 5: Fallback Validation
        validation_result = AnswerValidator.validate_with_fallback_strategies(
            llm_answer,
            llm_result["reasoning"]
        )
        
        # Step 6: Edge Case Handling
        final_answer, edge_case_note = AnswerValidator.handle_edge_cases(
            validation_result["final_answer"],
            problem_text
        )
        
        # Update result
        result["final_answer"] = final_answer
        result["confidence"] = verification_confidence if is_verified else validation_result["confidence"]
        result["strategy_used"] = validation_result["strategy_used"]
        result["verified"] = is_verified
        result["fallback_applied"] = validation_result["fallback_applied"]
        result["edge_case_note"] = edge_case_note
        
        # Step 7: Track Metrics
        metrics_tracker.record_result(
            success=True,
            fallback_used=validation_result["fallback_applied"],
            verified=is_verified,
            confidence=result["confidence"]
        )
        
    except Exception as e:
        logger.error(f"Error solving {problem_id}: {str(e)}")
        result["error"] = str(e)
        metrics_tracker.record_result(success=False)
    
    return result

print("‚úÖ Integrated solver function defined")

## 7. Test on Sample Problems

In [None]:
# Initialize metrics tracker
metrics_tracker = ExecutionMetrics()

# Sample problems for testing
sample_problems = [
    {"id": "test_001", "problem": "What is $2 + 3 \\times 5$?"},
    {"id": "test_002", "problem": "Solve $2x + 5 = 13$. What is $x$?"},
    {"id": "test_003", "problem": "Find $7 \\times 8$."}
]

print("\n" + "="*60)
print("TESTING PHASE 4 INTEGRATED SOLVER")
print("="*60 + "\n")

test_results = []
for prob in sample_problems:
    result = solve_with_phase4_verification(
        problem_text=prob["problem"],
        problem_id=prob["id"],
        metrics_tracker=metrics_tracker
    )
    test_results.append(result)
    
    print(f"Problem {prob['id']}: {prob['problem']}")
    print(f"  Answer: {result['final_answer']}")
    print(f"  Confidence: {result['confidence']:.2f}")
    print(f"  Strategy: {result['strategy_used']}")
    print(f"  Verified: {result['verified']}")
    print()

# Display metrics summary
print("\n" + "="*60)
print("PHASE 4 METRICS SUMMARY")
print("="*60)
summary = metrics_tracker.get_summary()
for key, value in summary.items():
    if 'rate' in key or 'confidence' in key:
        print(f"{key}: {value:.1%}" if 'rate' in key else f"{key}: {value:.2f}")
    else:
        print(f"{key}: {value}")
print("="*60)

## 8. Competition API Integration

**Note:** On Kaggle, you'll use the `aimo` API to iterate through problems.
For local testing, we'll simulate this with a CSV file.

In [None]:
# For Kaggle submission, uncomment and use this:
# from aimo import Inference
# inference = Inference()

# For local testing, load from CSV
test_data_paths = [
    "/kaggle/input/ai-mathematical-olympiad-progress-prize-3/test.csv",  # Kaggle path
    "../datasets/aimo3_test.csv",  # Local path
]

test_df = None
for path in test_data_paths:
    if os.path.exists(path):
        test_df = pd.read_csv(path)
        print(f"‚úÖ Loaded test data from {path}")
        print(f"   Total problems: {len(test_df)}")
        break

if test_df is None:
    print("‚ö†Ô∏è No test data found. Using sample problems.")
    test_df = pd.DataFrame(sample_problems)
    test_df.columns = ['id', 'problem']

In [None]:
# Solve all problems with Phase 4 verification
print(f"\nSolving {len(test_df)} problems with Phase 4 verification...\n")

predictions = []
detailed_results = []

# Reset metrics for full run
full_metrics = ExecutionMetrics()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Solving"):
    problem_id = str(row['id'])
    problem_text = str(row['problem'])
    
    result = solve_with_phase4_verification(
        problem_text=problem_text,
        problem_id=problem_id,
        metrics_tracker=full_metrics
    )
    
    predictions.append({
        'id': problem_id,
        'answer': result['final_answer']
    })
    
    detailed_results.append(result)

print(f"\n‚úÖ Solved {len(predictions)} problems")

# Create submission dataframe
submission_df = pd.DataFrame(predictions)
print(f"\nSubmission Preview:")
print(submission_df.head(10))

## 9. Phase 4 Metrics Report

In [None]:
print("\n" + "="*80)
print("PHASE 4 EXECUTION METRICS REPORT")
print("="*80 + "\n")

final_summary = full_metrics.get_summary()

print("üìä Overall Statistics:")
print(f"  Total Processed: {final_summary['total_processed']}")
print(f"  Successful: {final_summary['successful']}")
print(f"  Failed: {final_summary['failed']}")
print(f"  Success Rate: {final_summary['success_rate']:.1%}")
print()

print("üîç Verification Metrics:")
print(f"  Verified Count: {final_summary['verified_count']}")
print(f"  Verification Rate: {final_summary['verification_rate']:.1%}")
print(f"  Average Confidence: {final_summary['average_confidence']:.2f}")
print()

print("üîÑ Fallback Statistics:")
print(f"  Fallback Used: {final_summary['fallback_used_count']}")
print(f"  Fallback Rate: {final_summary['fallback_rate']:.1%}")
print()

# Answer statistics
answers = submission_df['answer'].tolist()
print("üìà Answer Statistics:")
print(f"  Min Answer: {min(answers)}")
print(f"  Max Answer: {max(answers)}")
print(f"  Mean Answer: {sum(answers)/len(answers):.2f}")
print(f"  Valid Range (0-99999): {all(0 <= a <= 99999 for a in answers)}")

print("\n" + "="*80)

## 10. Save Submission

In [None]:
# Save submission CSV
submission_path = "submission.csv"
submission_df.to_csv(submission_path, index=False)
print(f"‚úÖ Submission saved to {submission_path}")

# Save detailed results with Phase 4 metadata
detailed_path = "detailed_results_phase4.json"
with open(detailed_path, 'w') as f:
    json.dump(detailed_results, f, indent=2)
print(f"‚úÖ Detailed results saved to {detailed_path}")

# Save metrics summary
metrics_path = "phase4_metrics.json"
with open(metrics_path, 'w') as f:
    json.dump({
        "summary": final_summary,
        "timestamp": datetime.now().isoformat()
    }, f, indent=2)
print(f"‚úÖ Metrics saved to {metrics_path}")

print("\n" + "="*80)
print("‚úÖ SUBMISSION COMPLETE - ALL PHASES (1-4) EXECUTED")
print("="*80)