# AI Mathematical Olympiad - Progress Prize 3
## SC-TIR (Self-Consistency with Tool-Integrated Reasoning) Solution

In [None]:
# Setup - only install packages when internet is available (non-submission mode)
import os
IS_SUBMISSION = bool(os.getenv('KAGGLE_IS_COMPETITION_RERUN'))

if not IS_SUBMISSION:
    # Only install packages during testing (has internet), not during submission
    import subprocess
    subprocess.run(['pip', 'install', '-q', 'protobuf==3.20.3'], check=False)
    subprocess.run(['pip', 'install', '-q', 'sympy'], check=False)

In [None]:
import os
import re
import gc
import time
import traceback
from collections import Counter
from typing import Optional, List
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr

import polars as pl
import torch

IS_SUBMISSION = bool(os.getenv('KAGGLE_IS_COMPETITION_RERUN'))
print(f"Is submission: {IS_SUBMISSION}")

In [None]:
# Configuration - Qwen2.5-Math-7B on H100 (72B doesn't fit without quantization)
class Config:
    # Using 7B model because:
    # - 72B needs ~144GB (bfloat16), H100 only has 80GB
    # - bitsandbytes quantization is NOT available in Kaggle
    # - 7B model (~14GB) fits easily and is much faster
    model_id = "/kaggle/input/qwen2.5-math/transformers/7b-instruct/1"
    
    # More samples since 7B is faster (can afford more for better accuracy)
    num_samples = 32
    
    # Generation parameters
    temperature = 0.7
    max_new_tokens = 2048
    top_p = 0.95
    
    # Code execution
    code_timeout = 10
    max_code_executions = 3
    
    # Enable advanced features
    enable_feedback_loop = True
    max_feedback_turns = 2  # More feedback turns since 7B is faster
    
    # Self-verification
    enable_verification = True
    
    # No quantization needed for 7B
    use_4bit = False

In [None]:
# Advanced Chain-of-Thought Prompt
SYSTEM_PROMPT = """You are a world-class mathematician competing in the International Mathematical Olympiad.

## Problem-Solving Strategy

1. **Understand**: Read carefully. Identify what is given and what is asked.
2. **Plan**: Consider multiple approaches (algebraic, geometric, combinatorial, number-theoretic).
3. **Execute**: Work through the solution step-by-step with rigorous logic.
4. **Verify**: Check your answer by substitution or alternative method.
5. **Compute**: Use Python code for complex calculations.

## Code Execution Rules

- Write Python code inside ```python and ``` tags
- Use sympy for symbolic math, fractions for exact arithmetic
- Always print the final numerical result
- The code will be executed and results provided back to you

## Answer Format

- Your final answer must be a non-negative integer
- If the problem asks for remainder when divided by 10^5, compute that
- Put your FINAL answer inside \\boxed{} at the very end
- Double-check before giving the final answer

## Example Solution

Problem: Find the remainder when 2^100 is divided by 7.

Let me solve this step-by-step.

First, I'll find the pattern of powers of 2 modulo 7:
- 2^1 ≡ 2 (mod 7)
- 2^2 ≡ 4 (mod 7)
- 2^3 ≡ 8 ≡ 1 (mod 7)

The pattern repeats with period 3. Since 100 = 33×3 + 1, we have 2^100 ≡ 2^1 ≡ 2 (mod 7).

Let me verify with Python:

```python
result = pow(2, 100, 7)
print(result)
```

The code confirms our answer is 2.

Therefore, the answer is \\boxed{2}
"""

# Verification prompt for self-checking
VERIFY_PROMPT = """Review your solution and verify the answer is correct.

1. Check all calculations
2. Verify the logic is sound
3. If you used code, confirm the code is correct
4. Make sure the answer format matches what was asked

If you find an error, provide the corrected answer.
Put your FINAL verified answer inside \\boxed{}"""

In [None]:
def extract_code_blocks(text: str) -> List[str]:
    """Extract Python code blocks from text"""
    pattern = r'```python\s*(.*?)\s*```'
    matches = re.findall(pattern, text, re.DOTALL)
    return matches

def execute_code_safely(code: str) -> tuple:
    """Execute code safely with extended math libraries"""
    import math
    import cmath
    import fractions
    import itertools
    import functools
    from decimal import Decimal, getcontext
    
    # Set high precision for decimal calculations
    getcontext().prec = 50
    
    try:
        import sympy
        from sympy import (
            Symbol, symbols, solve, simplify, expand, factor,
            sqrt, Rational, pi, E, I, oo,
            sin, cos, tan, log, exp, 
            gcd, lcm, mod_inverse, factorial, binomial,
            isprime, prime, primerange, factorint, divisors,
            Matrix, det, eye,
            Sum, Product, limit, diff, integrate,
            floor, ceiling, Abs
        )
    except ImportError:
        sympy = None
    
    try:
        import numpy as np
    except ImportError:
        np = None
    
    # Extended safe builtins
    safe_globals = {
        "__builtins__": {
            "abs": abs, "all": all, "any": any, "bin": bin,
            "bool": bool, "dict": dict, "divmod": divmod,
            "enumerate": enumerate, "filter": filter, "float": float,
            "format": format, "frozenset": frozenset,
            "hash": hash, "hex": hex, "int": int, "isinstance": isinstance,
            "iter": iter, "len": len, "list": list, "map": map,
            "max": max, "min": min, "next": next, "oct": oct,
            "ord": ord, "pow": pow, "print": print, "range": range,
            "repr": repr, "reversed": reversed, "round": round,
            "set": set, "slice": slice, "sorted": sorted,
            "str": str, "sum": sum, "tuple": tuple, "type": type,
            "zip": zip, "True": True, "False": False, "None": None,
            "complex": complex, "bytes": bytes, "bytearray": bytearray,
        },
        "math": math,
        "cmath": cmath,
        "fractions": fractions,
        "Fraction": fractions.Fraction,
        "itertools": itertools,
        "functools": functools,
        "Decimal": Decimal,
        "reduce": functools.reduce,
    }
    
    # Add sympy functions if available
    if sympy:
        safe_globals["sympy"] = sympy
        safe_globals["Symbol"] = sympy.Symbol
        safe_globals["symbols"] = sympy.symbols
        safe_globals["solve"] = sympy.solve
        safe_globals["simplify"] = sympy.simplify
        safe_globals["expand"] = sympy.expand
        safe_globals["factor"] = sympy.factor
        safe_globals["sqrt"] = sympy.sqrt
        safe_globals["Rational"] = sympy.Rational
        safe_globals["pi"] = sympy.pi
        safe_globals["E"] = sympy.E
        safe_globals["I"] = sympy.I
        safe_globals["oo"] = sympy.oo
        safe_globals["sin"] = sympy.sin
        safe_globals["cos"] = sympy.cos
        safe_globals["tan"] = sympy.tan
        safe_globals["log"] = sympy.log
        safe_globals["exp"] = sympy.exp
        safe_globals["gcd"] = sympy.gcd
        safe_globals["lcm"] = sympy.lcm
        safe_globals["mod_inverse"] = sympy.mod_inverse
        safe_globals["factorial"] = sympy.factorial
        safe_globals["binomial"] = sympy.binomial
        safe_globals["isprime"] = sympy.isprime
        safe_globals["prime"] = sympy.prime
        safe_globals["primerange"] = sympy.primerange
        safe_globals["factorint"] = sympy.factorint
        safe_globals["divisors"] = sympy.divisors
        safe_globals["Matrix"] = sympy.Matrix
        safe_globals["floor"] = sympy.floor
        safe_globals["ceiling"] = sympy.ceiling
        safe_globals["Abs"] = sympy.Abs
        
    if np is not None:
        safe_globals["np"] = np
        safe_globals["numpy"] = np
    
    stdout_capture = StringIO()
    
    try:
        with redirect_stdout(stdout_capture):
            exec(code, safe_globals)
        return True, stdout_capture.getvalue()
    except Exception as e:
        return False, str(e)

In [None]:
def extract_answer(text: str) -> Optional[int]:
    """Extract final answer from text with enhanced pattern matching"""
    
    # Method 1: Look for \boxed{...} pattern (highest priority)
    boxed_patterns = [
        r'\\boxed\{(\d+)\}',  # \boxed{123}
        r'\\boxed\{([^}]+)\}',  # \boxed{anything}
        r'boxed\{(\d+)\}',  # boxed{123} without backslash
    ]
    
    for pattern in boxed_patterns:
        matches = re.findall(pattern, text)
        if matches:
            answer_str = matches[-1].strip()
            # Extract numbers from the match
            numbers = re.findall(r'-?\d+', answer_str)
            if numbers:
                try:
                    val = int(numbers[-1])
                    return val % 100000  # Ensure 5 digits max
                except:
                    pass
    
    # Method 2: Look for explicit answer statements
    answer_patterns = [
        r'(?:the\s+)?(?:final\s+)?answer\s+is\s*[:\s]*(\d+)',
        r'(?:the\s+)?remainder\s+is\s*[:\s]*(\d+)',
        r'(?:the\s+)?result\s+is\s*[:\s]*(\d+)',
        r'(?:therefore|thus|hence|so)[,\s]+(?:the\s+)?(?:answer\s+is\s+)?(\d+)',
        r'=\s*(\d+)\s*(?:\(mod\s*\d+\))?$',
        r'answer[:\s]+(\d+)',
    ]
    
    for pattern in answer_patterns:
        matches = re.findall(pattern, text.lower())
        if matches:
            try:
                val = int(matches[-1])
                return val % 100000
            except:
                pass
    
    # Method 3: Look for the last standalone number in the text
    # Focus on the last portion of the response
    last_section = text[-1000:]  # Last 1000 characters
    
    # Find numbers that look like final answers
    final_patterns = [
        r'(?:^|\n)\s*(\d+)\s*$',  # Number on its own line at the end
        r'(?:is|=|:)\s*(\d+)\s*\.?\s*$',  # Number after is/=/: at end
    ]
    
    for pattern in final_patterns:
        matches = re.findall(pattern, last_section.strip())
        if matches:
            try:
                val = int(matches[-1])
                if 0 <= val < 100000:  # Reasonable answer range
                    return val
            except:
                pass
    
    # Method 4: Last resort - find any number in the last part
    numbers = re.findall(r'\b(\d+)\b', last_section)
    if numbers:
        # Filter out very small numbers that are likely not answers
        candidates = [int(n) for n in numbers if len(n) >= 1]
        if candidates:
            return candidates[-1] % 100000
    
    return None

In [None]:
# Load model using transformers - 7B fits easily without quantization
from transformers import AutoModelForCausalLM, AutoTokenizer
import os

print("Loading model...")

# Verify model path exists
model_path = Config.model_id
if not os.path.exists(model_path):
    print(f"WARNING: Model path does not exist: {model_path}")
    print("Available inputs:")
    input_dir = "/kaggle/input"
    if os.path.exists(input_dir):
        for item in os.listdir(input_dir):
            item_path = os.path.join(input_dir, item)
            print(f"  - {item_path}")
            if os.path.isdir(item_path):
                for sub in os.listdir(item_path)[:5]:
                    print(f"      - {sub}")
    
    # Try to find any available qwen model variant
    qwen_dir = "/kaggle/input/qwen2.5-math/transformers"
    if os.path.exists(qwen_dir):
        print(f"\nAvailable Qwen variants in {qwen_dir}:")
        for variant in os.listdir(qwen_dir):
            print(f"  - {variant}")
    
    raise FileNotFoundError(f"Model not found at {model_path}")
else:
    print(f"✓ Model path verified: {model_path}")

# Load tokenizer
print("\nLoading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(
    model_path, 
    trust_remote_code=True
)

# Load model - 7B in bfloat16 is only ~14GB, fits easily on H100 (80GB)
print("Loading model (7B fits easily without quantization)...")
model = AutoModelForCausalLM.from_pretrained(
    model_path,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True,
    low_cpu_mem_usage=True,
)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

print(f"\n✓ Model loaded successfully!")
print(f"  Path: {model_path}")
print(f"  Device: {model.device}")
print(f"  VRAM usage: ~14GB (plenty of room on H100)")

In [None]:
def generate_response(prompt: str, max_tokens: int = None) -> str:
    """Generate a single response from the model"""
    if max_tokens is None:
        max_tokens = Config.max_new_tokens
        
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            temperature=Config.temperature,
            top_p=Config.top_p,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
        )
    
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response

def solve_with_feedback(problem: str) -> Optional[int]:
    """Solve a problem with code execution feedback loop"""
    
    # Initial prompt
    prompt = f"{SYSTEM_PROMPT}\n\nProblem: {problem}\n\nSolution:"
    response = generate_response(prompt)
    
    # Feedback loop - execute code and provide results back
    if Config.enable_feedback_loop:
        for turn in range(Config.max_feedback_turns):
            code_blocks = extract_code_blocks(response)
            
            if not code_blocks:
                break
                
            # Execute all code blocks
            code_results = []
            for i, code in enumerate(code_blocks[:Config.max_code_executions]):
                success, output = execute_code_safely(code)
                if success and output.strip():
                    code_results.append(f"Code block {i+1} output:\n{output.strip()}")
                elif not success:
                    code_results.append(f"Code block {i+1} error:\n{output}")
            
            if not code_results:
                break
                
            # Create feedback prompt
            feedback = "\n\n".join(code_results)
            feedback_prompt = f"{prompt}\n\n{response}\n\n**Code Execution Results:**\n{feedback}\n\nBased on these results, continue your solution and provide the final answer in \\boxed{{}}:"
            
            # Get updated response
            response = response + "\n\n**Code Results:**\n" + feedback + "\n\n" + generate_response(feedback_prompt, max_tokens=1024)
    
    # Self-verification
    if Config.enable_verification:
        answer = extract_answer(response)
        if answer is not None:
            verify_prompt = f"{prompt}\n\n{response}\n\n{VERIFY_PROMPT}"
            verification = generate_response(verify_prompt, max_tokens=512)
            verified_answer = extract_answer(verification)
            if verified_answer is not None:
                return verified_answer
    
    return extract_answer(response)

def solve_problem(problem: str) -> int:
    """Solve a math problem using enhanced SC-TIR with feedback and verification"""
    
    answers = []
    
    for i in range(Config.num_samples):
        try:
            answer = solve_with_feedback(problem)
            if answer is not None:
                answers.append(answer)
                
            # Progress indicator every 8 samples
            if (i + 1) % 8 == 0:
                print(f"    Progress: {i+1}/{Config.num_samples} samples")
                
        except Exception as e:
            print(f"    Sample {i+1} error: {e}")
            continue
    
    # Enhanced majority voting with confidence
    if answers:
        counter = Counter(answers)
        most_common = counter.most_common()
        
        # Print voting results
        print(f"  Votes: {dict(most_common[:5])}")
        
        # Check confidence (if top answer has >50% votes, use it)
        top_answer, top_count = most_common[0]
        confidence = top_count / len(answers)
        print(f"  Confidence: {confidence:.1%} ({top_count}/{len(answers)})")
        
        return top_answer
    
    print("  Warning: No valid answers found!")
    return 0

In [None]:
# Load test data
test_path = '/kaggle/input/ai-mathematical-olympiad-progress-prize-3/test.csv'
test_df = pl.read_csv(test_path)
print(f"Loaded {len(test_df)} problems")

In [None]:
# Solve all problems
results = []

for row in test_df.iter_rows(named=True):
    problem_id = row['id']
    problem = row['problem']
    
    print(f"\n{'='*50}")
    print(f"Solving: {problem_id}")
    print(f"Problem: {problem[:100]}...")
    
    start = time.time()
    answer = solve_problem(problem)
    elapsed = time.time() - start
    
    print(f"Answer: {answer} (took {elapsed:.1f}s)")
    
    results.append({'id': problem_id, 'answer': answer})
    gc.collect()
    torch.cuda.empty_cache()

print(f"\n{'='*50}")
print(f"Completed all {len(results)} problems")

In [None]:
# Save submission
submission_df = pl.DataFrame(results)
submission_df.write_csv('/kaggle/working/submission.csv')
print("Submission saved!")
print(submission_df)