In [None]:
#!pip install vllm

In [1]:
import re
import os
import gc
import random
import tempfile
import keyword
import subprocess
from collections import Counter

import polars as pl
import torch

from transformers import set_seed
from vllm import LLM, SamplingParams

# 1. Model, Test Data Load and Initialize etc.

- AIMO2 Reference File Needed : `../data/reference.csv`

In [2]:
SEED = 42
model_name = "Qwen/Qwen2.5-Math-1.5B-Instruct"
test_data = pl.read_csv(os.path.join(os.path.pardir, "data", "reference.csv"))

In [None]:
test_data['problem']


In [4]:
set_seed(SEED)

In [5]:
def clean_memory(deep=False):
    gc.collect()
    torch.cuda.empty_cache()

In [None]:
model = LLM(model_name,
            trust_remote_code=True,
            dtype="bfloat16",
            # dtype="half", # When using AWQ Model
            max_num_seqs=8,
            max_model_len=4096,
            tensor_parallel_size=1,
            gpu_memory_utilization=0.8,
            )

In [7]:
tokenizer = model.get_tokenizer()

In [8]:
sampling_params = SamplingParams(
    temperature=0.7,              # randomness of the sampling
    min_p=0.01,
    top_p=0.8,
    skip_special_tokens=True,     # Whether to skip special tokens in the output.
    max_tokens=2400,
    # stop=["```\n"],
    include_stop_str_in_output=True,
)

# 2. Define require functions

In [9]:
def extract_python_code(text):
    pattern = r'```python\s*(.*?)\s*```'
    matches = re.findall(pattern, text, re.DOTALL)
    return "\n\n".join(matches)

def extract_boxed_text(text):
    pattern = r'oxed{(.*?)}'
    matches = re.findall(pattern, text)
    if not matches:
        return ""
    return matches[0]

def select_answer(answers):
    counter = Counter()
    for answer in answers:
        try:
            if int(answer) == float(answer):
                counter[int(answer)] += 1 + random.random() / 1_000
        except:
            pass
    if not counter:
        return 210
    _, answer = sorted([(v,k) for k,v in counter.items()], reverse=True)[0]
    return answer%1000

In [10]:
# Python code excution Tool
class PythonREPL:
    def __init__(self, timeout=5):
        self.timeout = timeout

    def __call__(self, query):
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_file_path = os.path.join(temp_dir, "tmp.py")
            with open(temp_file_path, "w", encoding="utf-8") as f:
                f.write(query)
            
            try:
                result = subprocess.run(
                    ["python3", temp_file_path],
                    capture_output=True,
                    check=False,
                    text=True,
                    timeout=self.timeout,
                )
            except subprocess.TimeoutExpired:
                return False, f"Execution timed out after {self.timeout} seconds."

            stdout = result.stdout.strip()
            stderr = result.stderr.strip()

            if result.returncode == 0:
                return True, stdout
            else:
                # Process the error message to remove the temporary file path
                # This makes the error message cleaner and more user-friendly
                error_lines = stderr.split("\n")
                cleaned_errors = []
                for line in error_lines:
                    if temp_file_path in line:
                        # Remove the path from the error line
                        line = line.replace(temp_file_path, "<temporary_file>")
                    cleaned_errors.append(line)
                cleaned_error_msg = "\n".join(cleaned_errors)
                # Include stdout in the error case
                combined_output = f"{stdout}\n{cleaned_error_msg}" if stdout else cleaned_error_msg
                return False, combined_output
            

def process_python_code(query):
    # Add import statements
    # Also print variables if they are not inside any indentation
    query = "import math\nimport numpy as np\nimport sympy as sp\n" + query
    current_rows = query.strip().split("\n")
    new_rows = []
    for row in current_rows:
        new_rows.append(row)
        if not row.startswith(" ") and "=" in row:
            variables_to_print = row.split("=")[0].strip()
            for variable_to_print in variables_to_print.split(","):
                variable_to_print = variable_to_print.strip()
                if variable_to_print.isidentifier() and not keyword.iskeyword(variable_to_print):
                    if row.count("(") == row.count(")") and row.count("[") == row.count("]"):
                        # TODO: use some AST to parse code
                        new_rows.append(f'\ntry:\n    print(f"{variable_to_print}={{str({variable_to_print})[:100]}}")\nexcept:\n    pass\n')
    return "\n".join(new_rows)

In [11]:
def create_first_msg(question, i):
    cycle_len = 2

    if i % cycle_len:
        # When odd index number, use CoT Prompt
        return [
            {"role": "system", "content": "Please reason step by step, and put your final answer within \\boxed{}."},
            {"role": "user", "content": question}
        ]
    else:
        # When even index number, use TIR(Tool-Integrated Reasoning) Prompt
        return [
                {"role": "system", "content": "Please integrate natural language reasoning with programs to solve the problem above, and, put your final answer within \\boxed{}."},
                {"role": "user", "content": question + "\n\nBegin your answer by importing sympy."}
            ]

In [12]:
def generate_batch_message(messages):
    prompts = [
        tokenizer.apply_chat_template(
            message,
            tokenize=False,
            add_generation_prompt=True,
        )
        for message in messages
    ]

    outputs = model.generate(prompts=prompts, sampling_params=sampling_params)
    for message, output in zip(messages, outputs):
        message.append({"role" : "assistant", "content" : output.outputs[0].text})

    return messages

In [13]:
def filter_batch_message(messages):
    extracted_answer = []
    messages_to_keep = []
    for message in messages:
        answer = extract_boxed_text(message[-1]["content"])
        if answer:
            extracted_answer.append(answer)
        else:
            messages_to_keep.append(message)
    return messages_to_keep, extracted_answer

In [14]:
def execute_batch_message(messages):
    for message in messages:
        python_code = extract_python_code(message[-1]['content'])
        python_code = process_python_code(python_code)

        print("Python Excution State : ", end='')
        try:
            print('c', end='')
            is_success, output = PythonREPL()(python_code)
            if is_success:
                print('o', end='')
            else:
                print('e', end='')
        except Exception as e:
            print('f', end='')
            output = str(e)
        
        print(f"\n{python_code}\n")
        print(f"Excution Result : {output}\n")
        message.append({"role" : "user", "content" : "```output\n"+output+"\n```\n\nPlease rigorously check whether the output makes sense."}) # Python Excution Result Double check
    
    return messages


In [15]:
def predict_single_question(question : str, maj_n: int = 32, call_n: int = 4):
    question += "\nIf ther final answer is a number larger than 1 million, take modulo 1000."
    messages = [create_first_msg(question, i) for i in range(maj_n)] # odd `i` is CoT / even `i` is TIR

    extracted_answers = []
    for _ in range(call_n):
        messages = generate_batch_message(messages)
        messages, answers = filter_batch_message(messages)
        extracted_answers.extend(answers)
        if not messages:
            break
        messages = execute_batch_message(messages) # When you use TIR, add double check instuction after excute python code. => Only using CoT, you can skip this step for setting `call_n` to 1.
    print(extracted_answers)
    answer = select_answer(extracted_answers)
    print(answer)

    print("\n\n")
    return answer

In [None]:
predicted = []
for i in range(len(test_data['problem'])):
    test_question = test_data['problem'][i]
    test_gt = test_data['answer'][i]
    print(test_question)
    answer = predict_single_question(test_question, maj_n=64, call_n=4)
    print(f"GT : {test_gt}, Predict : {answer}")
    predicted.append(answer)

In [None]:
predicted

In [23]:
predicted_df = pl.DataFrame({
    "predicted" : predicted,
})

In [25]:
concatenated = pl.concat([test_data, predicted_df], how="horizontal")
print(f"Accuracy : {len(concatenated.filter(pl.col('answer') == pl.col('predicted'))) / len(concatenated) * 100}%")

In [None]:
concatenated.write_csv(os.path.join(os.path.pardir, "predictions", f"{model_name.split("/")[-1]}_prediction.csv"))

# Gold Generation

1.
To solve the problem, we need to find the greatest possible length of segment \(CD\), where \(D\) is the foot of the perpendicular from \(C\) to the line \(AB\). Given that \(AB = 120\) and the circumradius \(R = 100\), we can use the properties of the circumcircle and the triangle to find the maximum length of \(CD\).

The length \(CD\) is maximized when \(C\) is at the highest possible position relative to \(AB\). This happens when \(C\) is on the line perpendicular to \(AB\) passing through the circumcenter \(O\), and \(C\) is on the opposite side of \(AB\) relative to \(O\).

The distance from the circumcenter \(O\) to the chord \(AB\) can be calculated using the formula:
\[ d = \sqrt{R^2 - \left(\frac{AB}{2}\right)^2} \]
where \(R\) is the circumradius and \(AB\) is the length of the chord.

Let's calculate \(d\):
\[ d = \sqrt{100^2 - \left(\frac{120}{2}\right)^2} = \sqrt{100^2 - 60^2} = \sqrt{10000 - 3600} = \sqrt{6400} = 80 \]

The maximum length of \(CD\) is then the sum of the circumradius \(R\) and the distance \(d\):
\[ CD = R + d = 100 + 80 = 180 \]

Let's confirm this with sympy:

```python
import sympy as sp

# Given values
AB = 120
R = 100

# Calculate the distance from the circumcenter to the chord AB
d = sp.sqrt(R**2 - (AB/2)**2)

# Calculate the maximum length of CD
CD_max = R + d

print(CD_max)
```
```output
180.000000000000
```
The greatest possible length of segment \(CD\) is \(\boxed{180}\).


2.