# PROMETHEUS v4.2 - NF4 Quantized Edition

**Model:** Qwen2.5-Math-72B-Instruct (NF4 4-bit)  
**Framework:** transformers + bitsandbytes  
**Budget:** 280 minutes  

---

**Required:** Upload your NF4 model to Kaggle as a dataset first!

In [None]:
# CELL 1: Install Dependencies + GPU Check
import sys
import subprocess
import os

print("=== CELL 1: Setup ===")

# Purge any utility script paths
bad_paths = [p for p in sys.path if 'utility_script' in p.lower()]
for p in bad_paths:
    sys.path.remove(p)
    print(f"Removed from sys.path: {p}")

# Check triad-dev wheels
WHEEL_PATH = "/kaggle/input/triad-dev/utility_wheels"
print(f"Wheel path exists: {os.path.exists(WHEEL_PATH)}")
if os.path.exists(WHEEL_PATH):
    print(f"Wheels available: {os.listdir(WHEEL_PATH)}")

# Install bitsandbytes + accelerate from wheels
def install_wheel(name_prefix):
    if not os.path.exists(WHEEL_PATH):
        print(f"ERROR: {WHEEL_PATH} not found!")
        return False
    
    for f in os.listdir(WHEEL_PATH):
        if f.startswith(name_prefix):
            wheel = f"{WHEEL_PATH}/{f}"
            print(f"Installing {f}...")
            result = subprocess.run(
                [sys.executable, "-m", "pip", "install", "-q", "--no-index", "--no-deps", wheel],
                capture_output=True, text=True
            )
            if result.returncode != 0:
                print(f"  Error: {result.stderr[:200]}")
                return False
            print(f"  ✓ Installed")
            return True
    print(f"  Wheel not found for {name_prefix}")
    return False

# Install both
install_wheel("bitsandbytes")
install_wheel("accelerate")

# Verify
try:
    import bitsandbytes as bnb
    print(f"✓ bitsandbytes {bnb.__version__}")
except ImportError as e:
    print(f"✗ bitsandbytes import failed: {e}")

try:
    import accelerate
    print(f"✓ accelerate {accelerate.__version__}")
except ImportError as e:
    print(f"✗ accelerate import failed: {e}")

# GPU check
import torch
assert torch.cuda.is_available(), "GPU NOT ENABLED"
print(f"✓ GPU: {torch.cuda.get_device_name(0)}")
print(f"✓ VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")


In [None]:
# CELL 2: Imports & Configuration
import os, sys, gc, time, random, warnings, re, traceback
from pathlib import Path
from typing import Optional, List, Dict, Tuple, Any
from dataclasses import dataclass
from enum import Enum
from collections import Counter

warnings.filterwarnings('ignore')
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

import polars as pl

# ============== YOUR PATHS ==============
MODEL_BASE = "/kaggle/input/m/ryancardwell/qwen-72b-math-nf4/transformers/v1/1"

# Find config.json - could be in base or a subfolder
def find_model_path(base):
    # Check base directly
    if os.path.exists(f"{base}/config.json"):
        return base
    
    # Check immediate subfolders
    if os.path.isdir(base):
        for item in os.listdir(base):
            subpath = f"{base}/{item}"
            if os.path.isdir(subpath) and os.path.exists(f"{subpath}/config.json"):
                print(f"Found config.json in subfolder: {item}")
                return subpath
    
    # Recursive search (one more level)
    if os.path.isdir(base):
        for item in os.listdir(base):
            subpath = f"{base}/{item}"
            if os.path.isdir(subpath):
                for subitem in os.listdir(subpath):
                    subsubpath = f"{subpath}/{subitem}"
                    if os.path.isdir(subsubpath) and os.path.exists(f"{subsubpath}/config.json"):
                        print(f"Found config.json in: {item}/{subitem}")
                        return subsubpath
    
    print(f"WARNING: No config.json found! Using base: {base}")
    return base

MODEL_PATH = find_model_path(MODEL_BASE)
print(f"Model path: {MODEL_PATH}")

# Verify config.json exists
if os.path.exists(f"{MODEL_PATH}/config.json"):
    print(f"✓ config.json found")
    # Show model_type
    import json as _json
    with open(f"{MODEL_PATH}/config.json") as f:
        cfg = _json.load(f)
        print(f"✓ model_type: {cfg.get('model_type', 'NOT FOUND')}")
else:
    print(f"✗ config.json NOT FOUND at {MODEL_PATH}")
    print(f"  Contents: {os.listdir(MODEL_PATH) if os.path.isdir(MODEL_PATH) else 'NOT A DIR'}")

COMPETITION_DIR = "/kaggle/input/ai-mathematical-olympiad-progress-prize-3"
TRIAD_DEV = "/kaggle/input/triad-dev"
TEST_CSV = f"{COMPETITION_DIR}/test.csv"
LOCAL_TEST_JSONL = f"{TRIAD_DEV}/test_dataset/test.jsonl"
LOCAL_ANSWERS_JSON = f"{TRIAD_DEV}/test_dataset/answers.json"

# Budget
BUDGET_SECONDS = 280 * 60
START_TIME = time.time()

def time_remaining() -> float:
    return BUDGET_SECONDS - (time.time() - START_TIME)

def time_str() -> str:
    r = time_remaining()
    return f"{int(r//60)}m{int(r%60)}s"

print(f"✓ Budget: {BUDGET_SECONDS//60} minutes")


In [None]:
# CELL 3: Load NF4 Model with bitsandbytes
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import torch

print(f"Loading NF4 model from {MODEL_PATH}...")
print(f"Time: {time_str()}")

# BitsAndBytes 4-bit config
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
)

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True,
    torch_dtype=torch.bfloat16,
)
model.eval()

print(f"✓ Model loaded in {time.time() - START_TIME:.1f}s")
print(f"✓ Memory: {torch.cuda.memory_allocated()/1e9:.1f}GB allocated")

In [None]:
# CELL 4: Problem Classification
class ProblemType(Enum):
    NUMBER_THEORY = "number_theory"
    COMBINATORICS = "combinatorics"
    ALGEBRA = "algebra"
    GEOMETRY = "geometry"
    MIXED = "mixed"

@dataclass
class ProblemProfile:
    ptype: ProblemType
    has_modulo: bool = False
    modulo_target: Optional[int] = None
    is_counting: bool = False

def classify_problem(text: str) -> ProblemProfile:
    t = text.lower()
    
    # Modulo detection
    mod_match = re.search(r'(?:mod|modulo)\s*(\d+)', t)
    remainder_match = re.search(r'remainder.*?(?:divided by|when.*?by)\s*(\d+)', t)
    has_mod = bool(mod_match or remainder_match or 'modulo' in t)
    mod_target = int(mod_match.group(1)) if mod_match else (int(remainder_match.group(1)) if remainder_match else None)
    
    # Type keywords
    scores = {
        ProblemType.NUMBER_THEORY: sum(1 for k in ['prime', 'divisor', 'gcd', 'lcm', 'modulo', 'factorial'] if k in t),
        ProblemType.COMBINATORICS: sum(1 for k in ['how many', 'count', 'ways', 'permutation', 'combination'] if k in t),
        ProblemType.GEOMETRY: sum(1 for k in ['triangle', 'circle', 'angle', 'area', 'perimeter'] if k in t),
        ProblemType.ALGEBRA: sum(1 for k in ['polynomial', 'roots', 'equation', 'coefficient'] if k in t)
    }
    
    best = max(scores.items(), key=lambda x: x[1])
    ptype = best[0] if best[1] > 0 else ProblemType.MIXED
    
    return ProblemProfile(ptype=ptype, has_modulo=has_mod, modulo_target=mod_target,
                         is_counting=any(k in t for k in ['how many', 'count']))

print("✓ Classifier ready")

In [None]:
# CELL 5: Code Execution
STDLIB = '''
import math
from math import gcd, factorial, comb, isqrt, sqrt, ceil, floor
from itertools import permutations, combinations, product
from functools import reduce, lru_cache
from collections import Counter, defaultdict
from fractions import Fraction
try:
    from sympy import *
    from sympy.ntheory import factorint, divisors, totient, isprime
except: pass

def lcm(a, b): return abs(a * b) // gcd(a, b)
def is_prime(n):
    if n < 2: return False
    if n < 4: return True
    if n % 2 == 0: return False
    for i in range(3, isqrt(n) + 1, 2):
        if n % i == 0: return False
    return True
def C(n, k): return comb(n, k) if 0 <= k <= n else 0
'''

SNOOP = '''
for _v in ["answer", "ans", "result", "res", "total", "count"]:
    if _v in dir() and isinstance(eval(_v), (int, float)):
        print(f"EXTRACTED:{int(eval(_v))}")
        break
'''

def execute_code(code: str, timeout: int = 30) -> Tuple[Optional[int], str]:
    import signal
    from io import StringIO
    import contextlib
    
    full_code = STDLIB + '\n' + code + '\n' + SNOOP
    stdout = StringIO()
    
    def handler(signum, frame): raise TimeoutError()
    
    try:
        signal.signal(signal.SIGALRM, handler)
        signal.alarm(timeout)
        with contextlib.redirect_stdout(stdout):
            exec(full_code, {'__builtins__': __builtins__})
        signal.alarm(0)
        
        output = stdout.getvalue()
        match = re.search(r'EXTRACTED:(\d+)', output)
        if match:
            return int(match.group(1)), ""
        # Try last number
        nums = re.findall(r'\b(\d+)\b', output)
        if nums:
            return int(nums[-1]), ""
        return None, "No answer found"
    except TimeoutError:
        return None, "Timeout"
    except Exception as e:
        return None, f"{type(e).__name__}: {str(e)[:50]}"
    finally:
        signal.alarm(0)

def extract_code(text: str) -> Optional[str]:
    for pat in [r'```python\n(.*?)```', r'```\n(.*?)```']:
        m = re.findall(pat, text, re.DOTALL)
        if m: return m[0].strip()
    return None

print("✓ Executor ready")

In [None]:
# CELL 6: Generation
SYSTEM = """You are a math solver. Write Python code to solve the problem.
Store the final answer in a variable called 'answer'. Answer must be 0-99999."""

def generate(question: str, temperature: float = 0.7) -> str:
    """Generate single response"""
    prompt = f"<|im_start|>system\n{SYSTEM}<|im_end|>\n<|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"
    
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=2048,
            temperature=temperature,
            top_p=0.95,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
        )
    
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response

def generate_batch(questions: List[str], temperature: float = 0.7) -> List[str]:
    """Generate for multiple prompts (sequential - bitsandbytes doesn't batch well)"""
    return [generate(q, temperature) for q in questions]

print("✓ Generator ready")

In [None]:
# CELL 7: Main Solver
@dataclass
class Candidate:
    answer: int
    confidence: float = 0.5

def normalize_answer(x: Any) -> Optional[int]:
    try:
        val = int(float(x))
        return max(0, min(99999, val))
    except:
        return None

def extract_text_answer(text: str) -> Optional[int]:
    for pat in [r'\\boxed\{(\d+)\}', r'[Aa]nswer[:\s]+(\d+)', r'= (\d+)$']:
        m = re.search(pat, text)
        if m: return normalize_answer(m.group(1))
    nums = re.findall(r'\b(\d+)\b', text)
    return normalize_answer(nums[-1]) if nums else None

def predict_for_question(question: str) -> int:
    profile = classify_problem(question)
    print(f"  Type: {profile.ptype.value}")
    
    candidates = []
    
    # Generate 3 solutions (fewer than vLLM version - slower inference)
    for i, temp in enumerate([0.7, 0.5, 0.3]):
        if time_remaining() < 60:
            print(f"  Low time, stopping early")
            break
            
        print(f"  Gen {i+1} @ temp={temp}...")
        resp = generate(question, temp)
        
        code = extract_code(resp)
        if code:
            result, err = execute_code(code)
            if result is not None:
                candidates.append(Candidate(result, 0.9 - i*0.1))
                print(f"    → {result}")
            else:
                print(f"    ✗ {err[:30]}")
        else:
            ans = extract_text_answer(resp)
            if ans:
                candidates.append(Candidate(ans, 0.4))
                print(f"    → {ans} (text)")
    
    if not candidates:
        return 0
    
    # Weighted vote
    votes = {}
    for c in candidates:
        votes[c.answer] = votes.get(c.answer, 0) + c.confidence
    
    best = max(votes.items(), key=lambda x: x[1])[0]
    print(f"  Selected: {best}")
    return best

print("✓ Solver ready")

In [None]:
# CELL 8: API Interface
def predict(id_: pl.DataFrame, question: pl.DataFrame) -> pl.DataFrame:
    id_val = id_.item()
    q_text = question.item()
    
    print(f"\n{'='*50}")
    print(f"Problem: {id_val} | Time: {time_str()}")
    print(f"Q: {q_text[:150]}..." if len(q_text) > 150 else f"Q: {q_text}")
    
    try:
        answer = predict_for_question(q_text)
    except Exception as e:
        print(f"  ERROR: {e}")
        answer = 0
    
    answer = max(0, min(99999, int(answer)))
    print(f"  FINAL: {answer}")
    
    return pl.DataFrame({'id': id_val, 'answer': answer})

print("✓ API ready")

In [None]:
# CELL 9: Local Validation (optional)
def validate_local(n: int = 5):
    import json as _json
    with open(LOCAL_ANSWERS_JSON) as f:
        answers = _json.load(f)
    problems = [_json.loads(l) for l in open(LOCAL_TEST_JSONL)]
    
    correct = 0
    for prob in problems[:n]:
        pred = predict_for_question(prob['problem'])
        exp = answers[prob['id']]
        ok = pred == exp
        correct += ok
        print(f"{'✓' if ok else '✗'} {prob['id']}: {pred} vs {exp}")
    
    print(f"\nScore: {correct}/{n} ({100*correct/n:.0f}%)")

# Uncomment to test:
# validate_local(5)

In [None]:
# CELL 10: Start Server
import kaggle_evaluation.aimo_3_inference_server

print(f"\nPROMETHEUS v4.2 NF4 | Budget: {BUDGET_SECONDS//60}m | Time: {time_str()}")

server = kaggle_evaluation.aimo_3_inference_server.AIMO3InferenceServer(predict)

if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    print("MODE: Competition")
    server.serve()
else:
    print("MODE: Local")
    server.run_local_gateway((TEST_CSV,))