In [1]:
from math_verify import parse, verify

a = parse("\\frac{1}{2}")
b = parse("1/2")

print(verify(a, b))

True


In [2]:
import json
import re

def extract_last_num(text: str) -> float:
    text = re.sub(r"(\d),(\d)", r"\g<1>\g<2>", text)  # 处理形如 123,456
    res = re.findall(r"\\boxed\{(\d+(\.\d+)?)", text)  # 匹配 \\boxed
    if len(res) == 0:
        res = re.findall(r"(\d+(\.\d+)?)", text)  # 匹配 123456.789
    if len(res) > 0:
        num_str = res[-1][0]
        return float(num_str)
    else:
        return 0.0

path = "/home/nfs02/laizj/experiment/uncertainty_analysis/analysis_unknown/results/qwen72b_instruct-generate_template_and_code-orca_10k_train.json"
with open(path, "r") as f:
    data = json.load(f)
    for i in range(len(data)):
        data[i] = {
            "query": data[i]["question"],
            "generated_texts": [data[i]["response"]],
            "response": data[i]["answer"],
            "answer": extract_last_num(data[i]["answer"])
        }

with open(path, "w") as f:
    json.dump(data, f, ensure_ascii=False, indent=4)
    print(f"Data saved to {path}")

Data saved to /home/nfs02/laizj/experiment/uncertainty_analysis/analysis_unknown/results/qwen72b_instruct-generate_template_and_code-orca_10k_train.json


In [6]:

import contextlib
from io import StringIO
import re
import subprocess
import json
from tqdm import tqdm
import os
from contextlib import contextmanager
import signal
import builtins
import threading
input_path = "/home/nfs02/laizj/experiment/uncertainty_analysis/analysis_unknown/results/qwen72b_instruct-generate_template_and_code-orca_10k_train.json"
output_path = f"{os.path.splitext(input_path)[0]}-parsed.json"

success = 0
template_generation_fault_count = 0 # 无法生成模板
template_generation_mistake_count = 0 # 生成的模板与原始问题不匹配
python_generation_fault_count = 0 # 无法生成 Python 代码
python_run_fault_count = 0 # 无法运行的 Python 代码
python_run_mistake_count = 0 # Python 代码运行结果错误
template_python_not_algined_count = 0 # 模板中的变量与 Python 代码中的变量不对齐

# def runcode(code):
#     """执行 Python 代码并返回输出"""
#     try:
#         result = subprocess.run(['python', '-c', code], capture_output=True, text=True)
#         output = float(result.stdout.strip())
#     except Exception as e:
#         return None
#     return output

def mock_input(prompt=""):
    return "" 
builtins.input = mock_input

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise Exception("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)

def runcode(code):
    """执行 Python 代码并返回输出"""
    output = ""
    try:
        with time_limit(10):
            # 捕获标准输出
            with StringIO() as buf, contextlib.redirect_stdout(buf):
                exec(code, {}, {})
                output = buf.getvalue().strip()
    except Exception as e:
        return None
    
    try:
        return float(output)
    except ValueError:
        # 处理字符串答案
        if re.search(r'\d', output):
             output = None
        else:
            return output

def extract_last_num(text: str) -> float:
    text = re.sub(r"(\d),(\d)", r"\g<1>\g<2>", text)  # 处理形如 123,456
    res = re.findall(r"\\boxed\{(\d+(\.\d+)?)", text)  # 匹配 \\boxed
    if len(res) == 0:
        res = re.findall(r"(\d+(\.\d+)?)", text)  # 匹配 123456.789
    if len(res) > 0:
        num_str = res[-1][0]
        return float(num_str)
    else:
        return 0.0

c = 0

def extract_content(item):
    
    global success, python_run_fault_count, python_run_mistake_count, python_generation_fault_count, template_generation_fault_count, template_generation_mistake_count, template_python_not_algined_count
    
    generation = item["generated_texts"][0]
    # print(generation)
    # 提取第一次出现的### template后的内容
    template_match = re.search(r'### (?:Query|Query Template|Template):(.*?)(?=###|$)', generation, re.DOTALL | re.IGNORECASE)
    template_content = template_match.group(1).strip() if template_match else None
    
    # 提取### python代码块中的代码
    python_code_match = re.search(r'### Python Code:\s*```(?:python)?\s*(.*?)\s*```', generation, re.DOTALL | re.IGNORECASE)
    python_code = python_code_match.group(1).strip() if python_code_match else None
    
    if python_code is None:
        python_generation_fault_count += 1
    elif template_content is None:
        template_generation_fault_count += 1 
    elif abs(template_content.count(' ') - item["query"].count(' ')) / item["query"].count(' ') > 1:
        template_generation_mistake_count += 1
    else:
        python_result = runcode(python_code)
        if python_result is None:
            global c
            # c += 1
            # if c == 31:
            #     print(python_code)
            #     print(item["query"])
            #     print(item["response"])
            #     raise Exception
            python_run_fault_count += 1
        elif isinstance(python_result, str):
            item["answer"] = python_result
        elif abs(python_result - item["answer"]) > 1e-2:
            python_run_mistake_count += 1
        else:
            variables = re.findall(r'<([^>]+?)>', template_content)
            for var in variables:
                pattern = r'\b' + re.escape(var) + r'\s*?='
                if re.search(pattern, python_code) is None:
                    template_python_not_algined_count += 1
                    return False
            else:
                success += 1
                item["template"] = template_content
                item["python"] = python_code
            return True
        
    return False

with open(input_path, "r") as f:
    data = json.load(f)
    results = []
    for item in tqdm(data):
        if extract_content(item):
            results.append(item)
    with open(output_path, "w") as f:
        json.dump(results, f, ensure_ascii=False, indent=4)
        
    print(f"成功生成：{success} / {len(data)}, 成功率: {success / len(data)}")
    print(f"无法生成模板：{template_generation_fault_count}")
    print(f"生成的模板与原始问题不匹配：{template_generation_mistake_count}")
    print(f"无法生成 Python 代码：{python_generation_fault_count}")
    print(f"无法运行的 Python 代码：{python_run_fault_count}")
    print(f"Python 代码运行结果错误：{python_run_mistake_count}")
    print(f"模板中的变量与 Python 代码中的变量不对齐：{template_python_not_algined_count}")


100%|██████████| 10000/10000 [00:01<00:00, 5389.00it/s]


成功生成：5715 / 10000, 成功率: 0.5715
无法生成模板：0
生成的模板与原始问题不匹配：1
无法生成 Python 代码：18
无法运行的 Python 代码：2519
Python 代码运行结果错误：1314
模板中的变量与 Python 代码中的变量不对齐：93


In [None]:
import sys
import json
import os
import re
import random
import ast
import string
import subprocess
from logging import exception
from tqdm.contrib.concurrent import process_map
from functools import partial
from transformers import AutoTokenizer
from fractions import Fraction
from contextlib import contextmanager
import signal
import builtins
import time

max_fluct=1.0
sample_times=16
time_out_seconds=30
dataset="orca_10k_train"
model_name="qwen72b_instruct"
tokenizer = AutoTokenizer.from_pretrained("/home/nfs05/laizj/model/models--Qwen--Qwen2.5-Math-7B-Instruct/snapshots/ef9926d75ab1d54532f6a30dd5e760355eb9aa4d")
input_path = f"/home/nfs02/laizj/experiment/uncertainty_analysis/analysis_unknown/results/{model_name}-generate_template_and_code-{dataset}-parsed.json"
output_path = f"/home/nfs02/laizj/experiment/uncertainty_analysis/analysis_unknown/data/synthetic_data/{model_name}-generate_template_and_code-{dataset}-disturbed_{max_fluct}_{sample_times}.json"

os.environ["TOKENIZERS_PARALLELISM"] = "false"
def mock_input(prompt=""):
    return "" 
builtins.input = mock_input

def extract_last_num(text) -> float:
    if isinstance(text, str):
        text = re.sub(r"(\d),(\d)", r"\g<1>\g<2>", text)  # 处理形如 123,456
        res = re.findall(r"\\boxed\{(\d+(\.\d+)?)", text)  # 匹配 \\boxed
        if len(res) == 0:
            res = re.findall(r"(\d+(\.\d+)?)", text)  # 匹配 123456.789
        if len(res) > 0:
            num_str = res[-1][0]
            return float(num_str)
        else:
            return 0.0
    else:
        return text

def check_validity(value, old_value):
    try:
        if isinstance(old_value, str):
            return old_value.strip().lower() == value.strip().lower()
        
        old_value = int(old_value) if float(old_value).is_integer() else float(old_value)
        value = float(value)
        if isinstance(old_value, int):
            if value.is_integer():
                value = int(value)
            else:
                return False
        return value * old_value >= 0
    except ValueError:
        return False
    
def python_run(code):
    local_env = {
        '__result__': "",  # 存储 print 的内容
    }
    def custom_print(*args, **kwargs):
    # 将所有参数转换为字符串并拼接
        sep = kwargs.get('sep', ' ')
        end = kwargs.get('end', '\n')
        output = sep.join(str(arg) for arg in args) + end
        local_env['__result__'] += output
    local_env['print'] = custom_print
    exec(code, local_env)
    return local_env["__result__"]

def randomize_value(original_value, max_fluct=1.0, upper_bound=10**9):
    """
    Returns a value randomly fluctuated within ±(max_fluct * original_value).
    If original_value is int, the result is rounded back to int.
    """

    lower_bound = original_value * (1 - max_fluct)
    if original_value > 0:
        lower_bound = max(1 if isinstance(original_value, int) else 0.01, lower_bound)
    upper_bound = min(original_value * (1 + max_fluct), upper_bound)
    if original_value < 0:
        upper_bound = min(-1 if isinstance(original_value, int) else -0.01, upper_bound)
    if isinstance(original_value, float) and 0 < original_value < 1:
        lower_bound = max(0.01, lower_bound)
        upper_bound = min(0.99, upper_bound)

    if random.random() < 0.5 and original_value != 0:
        for _ in range(50):
            new_value = random.uniform(lower_bound, upper_bound)
            
            if isinstance(original_value, int):
                new_value =  int(round(new_value))
            else:
                new_value = round(new_value, 2)
                
            if new_value != original_value:
                break
            
            max_fluct += 0.1
            lower_bound = original_value * (1 - max_fluct)
            if original_value > 0:
                lower_bound = max(1 if isinstance(original_value, int) else 0.01, lower_bound)
            upper_bound = original_value * (1 + max_fluct)
            if original_value < 0:
                upper_bound = min(-1 if isinstance(original_value, int) else -0.01, upper_bound)
            if isinstance(original_value, float) and 0 < original_value < 1:
                lower_bound = max(0.01, lower_bound)
                upper_bound = min(0.99, upper_bound)
    else:
        new_value = random.uniform(lower_bound, upper_bound)
        
        if isinstance(original_value, int):
            new_value =  int(round(new_value))
        else:
            new_value = round(new_value, 2)
        
    return new_value

def randomize_code(original_code: str, original_query=None, original_ans=None):

    # Split the code by lines
    lines = original_code.split('\n')

    # We’ll collect lines until we hit the first consecutive blank line
    # (i.e., an empty line).
    variable_lines = []

    for i, line in enumerate(lines):
        if variable_lines == [] and line.strip() == "":
            continue
        # Detect if the line is empty
        if line.strip() == "":
            # This is the first consecutive newline => stop collecting variable lines
            break
        else:
            variable_lines.append(line)
    
    # Use a regex to match lines of the form: name = number
    
    pattern = re.compile(r'^(\s*\w+)\s*=\s*(\d[\d/ ]*|\d*\.\d*)\s*(#.*)?$')
    pattern2 = re.compile(r'^(\s*\w+)\s*=\s*(.*?)\s*(#.*)?$')
    
    # Filter the template not aligned sample
    for line in variable_lines:
        match = pattern.match(line)
        if match:
            prefix = match.group(1)
            if f"<{prefix}>" not in original_query:
                return {"template_python_not_algined_count": None}
            
    variable_num = 0
    for line in variable_lines:
        match = pattern.match(line)
        if match:
            variable_num += 1
    
    
    def signal_handler(signum, frame):
        raise TimeoutError("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(time_out_seconds)
    try:
        for variable_limits in range(variable_num, 0, -1):
            for _ in range(100):
                variable_count = 0
                new_variable_lines = []
                replaced_variables = []
                for idx, line in enumerate(variable_lines):
                    match = pattern.match(line)
                    if match:
                        # Extract the variable name, the original numeric value, and any trailing spaces
                        prefix = match.group(1)  # e.g. "variables_a"
                        original_value_str = match.group(2)  # e.g. "150"
                        suffix = match.group(3)  # trailing spaces if any
                        suffix = suffix if suffix else ""
                        
                        if variable_limits == variable_count:
                            new_value_str = original_value_str
                        else:
                            # Determine if it’s int or float
                            if '/' in original_value_str:
                                original_value_str = original_value_str.replace('//', '/')
                                original_value_str = original_value_str.replace(' ', '')
                                try:
                                    original_value = Fraction(original_value_str)
                                except:
                                    raise Exception(line)
                                numerator = original_value.numerator    # 分子
                                denominator = original_value.denominator  # 分母
                                if numerator == 1:
                                    denominator = randomize_value(denominator, max_fluct=max_fluct)
                                else:
                                    numerator = randomize_value(numerator, max_fluct=max_fluct)
                                new_value_str = f"{numerator}/{denominator}"
                            else:
                                if abs(float(original_value_str) - int(float(original_value_str))) < 1e-6:
                                    original_value = int(float(original_value_str))
                                else:
                                    original_value = float(original_value_str)
                                new_val = randomize_value(original_value, max_fluct=max_fluct, upper_bound=100 if 'percentage' in prefix else 10 ** 9)
                                new_value_str = str(new_val)
                                
                        replaced_variables.append((prefix, str(new_value_str)))
                        # Rebuild the line
                        new_variable_lines.append(prefix + " = " + new_value_str + suffix)
                        variable_count += 1
                    else:
                        new_variable_lines.append(line)
                    
                    match = pattern2.match(line)
                    if match:
                        prefix = match.group(1)  # e.g. "variables_a"
                        original_value_str = match.group(2)  # e.g. "150"
                        suffix = match.group(3)  # trailing spaces if any
                        suffix = suffix if suffix else ""

                        replaced_variables.append((prefix, str(original_value_str)))
                        
                final_code = '\n'.join(new_variable_lines) + '\n' + '\n'.join(lines[i:])
                try:
                    # 捕获标准输出
                    result = python_run(final_code)
                    if check_validity(result, original_ans):
                        if original_query:
                            for var, new_value in replaced_variables:
                                original_query = original_query.replace(f"<{var}>", str(new_value))
                            return {
                                "new_query": original_query, 
                                "new_code": final_code, 
                                "new_ans": float(result)
                                }
                        else:
                            return final_code
                    else:
                        # raise exception("The code is not valid.")
                        pass
                except TimeoutError as e:
                    raise e
                except:
                    pass
    except:
        pass
    finally:
        signal.alarm(0)

    return {"random_generate_difficult_count": None}


# messages = [
#     {"role": "system", "content": "Below is an instruction that describes a task. Write a response that appropriately completes the request. Output each step in a separate line, and explicitly state the final answer after the final step following the format \"The answer is\""},
#     {"role": "user", "content": prompt}
# ]


def process_item(item, tokenizer):
    
    disturbed_results = []
    for _ in range(sample_times):
        r = randomize_code(item["python"], item["template"],extract_last_num(item["answer"]))
        if 'new_ans' in r:
            messages = [
                {"role": "system", "content": "Below is an instruction that describes a task. Write a response that appropriately completes the request. Output each step in a separate line, and explicitly state the final answer after the final step within \\boxed{}."},
                {"role": "user", "content": r["new_query"]} # type: ignore
            ]

            r["prompt"] = tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=True)
            disturbed_results.append(r)
            # 将r的内容均添加到item中
    if disturbed_results:
        item.update({"disturbed": disturbed_results})
    else:
        item.update({"random_generate_difficult_count": None})
    return item

# with open(input_path, "r") as f:
#     data = json.load(f)

# # 使用偏函数固定tokenizer参数
# process_func = partial(process_item, tokenizer=tokenizer)

# # 多进程处理
# process_results = process_map(
#     process_func,
#     data,
#     max_workers=os.cpu_count() // 2,
#     chunksize=15,
# )

# 过滤掉None结果
results = []
template_python_not_algined_count = []
random_generate_difficult_count = []
for item in process_results:
    if "disturbed" in item:
        results.append(item)
    else:
        if "template_python_not_algined_count" in item:
            template_python_not_algined_count.append(item.get("id", item.get("idx", None)))
        elif "random_generate_difficult_count" in item:
            random_generate_difficult_count.append(item.get("id", item.get("idx", None)))

with open(output_path, "w") as f:
    json.dump(results, f, ensure_ascii=False, indent=4)

print(f"生成数量：{len(results)} / {len(data)}, 成功率：{len(results) / len(data) * 100:.2f}%")
print(f"Python 代码中的变量与模板中的变量不对齐数量：{len(template_python_not_algined_count)}, {template_python_not_algined_count[:5]}")
print(f"随机生成困难数量：{len(random_generate_difficult_count)}, {random_generate_difficult_count[:5]}")

生成数量：5436 / 5715, 成功率：95.12%
Python 代码中的变量与模板中的变量不对齐数量：0, []
随机生成困难数量：279, [None, None, None, None, None]


In [None]:

from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("/home/nfs05/laizj/model/models--Qwen--Qwen2.5-Math-7B/snapshots/b101308fe89651ea5ce025f25317fea6fc07e96e")


In [14]:
data[2]

{'query': 'You wanted to subtract 46 from a number, but you accidentally subtract 59 and get 43. How much do you get from the correct calculation?',
 'generated_texts': ['### Query Template:\nYou wanted to subtract <incorrect_subtrahend> from a number, but you accidentally subtract <correct_subtrahend> and get <result>. How much do you get from the correct calculation?\n\n### Python Code:\n```python\n# Variable definitions\nincorrect_subtrahend = 59\ncorrect_subtrahend = 46\nresult = 43\n\n# Calculation\noriginal_number = result + incorrect_subtrahend\ncorrect_result = original_number - correct_subtrahend\n\n# Output\nprint(correct_result)\n```'],
 'response': 'If you accidentally subtracted 59 instead of 46 and got 43, you can find the original number by adding 59 back to 43:\n\n43 + 59 = 102\n\nNow, to find the result of the correct calculation, subtract 46 from the original number:\n\n102 - 46 = 56\n\nSo, if you subtract 46 from the original number, you would get 56.',
 'answer': 56

In [None]:
errors = []

for item in results:
    if "<" in item["disturbed"][0]["new_query"]:
        errors.append(item)

In [None]:
len(errors)

130

In [None]:


def decode_qwen_tokens(tokens):
    text = ""
    for token in tokens:
        if token.startswith("Ġ"):
            # 处理 Ġ 开头的 token（前面加空格）
            text += " " + token.replace("Ġ", "")
        else:
            # 直接拼接其他情况
            text += token
        text += "|"
    return text.strip()  # 去除首尾多余空格

decoded_text = decode_qwen_tokens(tokens)
print(decoded_text)

| |0|.|0|.| [|8|9|0|,| |5|6|7|,| |2|3|4|,| |9|0|1|,| |6|7|8|,| |3|4|5|,| |1|2|]| *| |2|1|0|
