In [None]:
import json
import re
# from data_process import split_and_sample_steps
from data_process import extract_numerical_answer,eval_answer
import random

In [None]:
def split_and_sample_steps(text):
    # 检查是否存在 [Step ] 或 [step ] 标签
    if re.search(r"\[Step\s*\d+\]", text, re.IGNORECASE):
        # 使用 [Step ] 或 [step ] 标签进行分割
        steps = re.split(r"\[Step\s*\d+\]:?", text, flags=re.IGNORECASE)
    else:
        # 如果没有找到 [Step ] 标签，用换行符 \n\n 或 \n 分割
        steps = re.split(r"\n\n|\n", text)
    
    # 移除空白步骤并去除每个步骤的前后空格
    steps = [step.strip() for step in steps if step.strip()]
    
    # 获取实际步骤数
    step_count = len(steps)
    if step_count>3:
        # 随机生成一个 k 值，在 1 到 step_count 范围内
        k = random.randint(1, step_count-1)

        # 返回前 k 个步骤
        sampled_steps = steps[:k]
        restored_steps = [f"[Step {i + 1}]:\n{step}" for i, step in enumerate(sampled_steps[:-1])]
        k_step = f"[Step {k}]:\n {sampled_steps[-1]}"
        # 将恢复后的步骤组合成完整文本
        restored_text = "\n\n".join(restored_steps)
        return restored_text,k_step
    else:
        return None,None

In [None]:
def generate_similar_number(number):
    """
    对给定数字进行置换，返回一个相近的数字。
    """
    # 随机选择置换方式
    choice = random.choice(['modify_digit', 'add_prefix', 'value_float'])
    is_integer = isinstance(number, int)  # 检查原数字是否为整数

    if choice == 'modify_digit':
        # 将数字转换为字符串并随机修改其中一位数字
        num_str = str(number)
        index = random.randint(0, len(num_str) - 1)
        
        # 只替换数字字符
        while not num_str[index].isdigit():
            index = random.randint(0, len(num_str) - 1)
        
        original_digit = num_str[index]
        
        # 循环直到生成一个不同的数字
        new_digit = original_digit
        while new_digit == original_digit:
            new_digit = str(random.randint(0, 9))
        
        # 生成新的字符串
        num_str = num_str[:index] + new_digit + num_str[index + 1:]
        result = float(num_str) if '.' in num_str else int(num_str)
        return result if not is_integer else int(result)

    elif choice == 'add_prefix':
        # 在数字前面添加一位随机数字
        prefix = str(random.randint(1, 9))
        num_str = str(number)
        
        # 如果是负数，在负号后添加前缀
        if num_str[0] == '-':
            num_str = '-' + prefix + num_str[1:]
        else:
            num_str = prefix + num_str
        
        result = float(num_str) if '.' in num_str else int(num_str)
        return result if not is_integer else int(result)

    elif choice == 'value_float':
        # 根据数字大小决定浮动幅度
        if is_integer:
            fluctuation = 1 if abs(number) < 10 else int(number * 0.1) * random.choice([-1, 1])
            return int(number + fluctuation)
        else:
            fluctuation = 0.1 if abs(number) < 10 else number * 0.1 * random.choice([-1, 1])
            return round(number + fluctuation, 2)  # 保持两位小数

In [None]:
def extract_last_equation_rhs(data):
    """
    提取数据中的最后一个等式，并返回最后一个等号右侧的数字。
    """
    # 匹配等号及其右侧的数字（支持小数和负数）
    matches = re.findall(r'=\s*([-+]?\d+(\.\d+)?)', data)
    
    # 如果找到了等式，则返回最后一个等式右侧的数字
    if matches:
        last_rhs = matches[-1][0]  # 获取最后一个匹配的数字
        # 返回浮点数或整数
        return float(last_rhs) if '.' in last_rhs else int(last_rhs)
    return None

In [None]:
def replace_last_rhs_number(k_step):
    """
    在给定的文本中找到最后一个等式的等号右侧数字，并将其替换为相近的数字。
    """
    rhs_number = extract_last_equation_rhs(k_step)
    if rhs_number is not None:
        similar_number = generate_similar_number(rhs_number)
        
        # 使用 finditer 找到所有等号后数字的位置，获取最后一个位置
        matches = list(re.finditer(r'(=\s*)([-+]?\d+(\.\d+)?)', k_step))
        if matches:
            # 获取最后一个匹配的位置
            last_match = matches[-1]
            start, end = last_match.span(2)  # 获取数字部分的起始和结束位置
            
            # 替换数字
            k_step_updated = k_step[:start] + str(similar_number) + k_step[end:]
            return k_step_updated
    return k_step

In [None]:
file =  '/data/lyz/math_dpo/datas/gsmhard_select_2.json'
with open(file) as f:
    datas = json.load(f)

In [None]:
datas_inner = []
for data in datas:
    question = data['question']
    solutions = data['solutions']
    is_answer =data['is_answer']
    target = data['final_answer']
    #
    for i,sol in enumerate(solutions):
        if is_answer[i]:
            r,k = split_and_sample_steps(sol)
            if k:
                false_k = replace_last_rhs_number(k)
                if false_k != k:
                    data_one = {
                        'question':question,
                        'final_answer':target,
                        'solution':sol,
                        'front_step':r,
                        'k_step':k,
                        'false_k_step':false_k
                    }
                    datas_inner.append(data_one)

In [None]:
len(datas_inner)

In [None]:
file = '/data/lyz/math_dpo/datas/innerstep/gsmhard_innerstep_sample.json'
with open(file,'w')as f:
    json.dump(datas_inner,f,indent = 4)

In [None]:
file = '/data/lyz/math_dpo/output/gsm8k_test_naive.json'
with open(file ) as f:
    datas = json.load(f)
    

In [None]:
len(datas)

In [None]:
cnt = 0
for ind,data in enumerate(datas):
    solutions = data['solutions']
    target = data['final_answer']
    answers = extract_numerical_answer(solutions,target)
    is_answer = eval_answer(answers,target)
    # print(ind)
    # print(target)
    # print(answers)
    # print(is_answer)
    for l in is_answer:
        if l :
            cnt += 1 
            break
print(cnt / len(datas))

In [2]:
import json
file = '/data/lyz/math_dpo/datas/gsm8k_select_2.json'
with open(file)as f:
    datas= json.load(f)

In [None]:
len(datas)

In [4]:
for sol in datas[1]['solutions']:
    print('[Step 1]: '+sol)
    print('#' * 20)
    

[Step 1]: Betty needs $100 for the wallet and currently has half of that amount. So, she has \( \frac{1}{2} \times 100 = 50 \) dollars.

[Step 2]: Betty's parents gave her $15.

[Step 3]: Betty's grandparents gave her twice as much as her parents, which is \( 2 \times 15 = 30 \) dollars.

[Step 4]: To find out how much more money Betty needs, we first need to calculate the total amount she has now. That's the sum of the money she has, the money from her parents, and the money from her grandparents. So, she has \( 50 + 15 + 30 = 95 \) dollars now.

[Step 5]: Finally, to find out how much more money Betty needs to buy the wallet, we subtract the amount she has from the total cost of the wallet. That's \( 100 - 95 = 5 \) dollars.

So, the numerical answer is: [5]
####################
[Step 1]: First, let's determine how much money Betty currently has. Since she has half of the $100 wallet cost, we calculate:

\[ \text{Betty's current savings} = \frac{1}{2} \times 100 = 50 \]

[Step 2]: Ne

In [None]:
from collections import defaultdict
def get_gsm(datas):
    data = defaultdict(lambda: defaultdict(list))
    instruct = """
                You need to continue writing based on the math problem I provided and the unfinished steps to get the final answer. And strictly follow the format of [Step].
                And at the end of the output, you should give your final numerical answer use \'So, the numerical answer is: \'.

    """
    for d in datas:
        prompt = instruct + d['question'] + 'Let\'s think step by step.\n[Step 1]:\n'
        chosen
        n_responses = len(data[prompt]['responses'])
        data[prompt]['pairs'].append((chosen, reject))
        data[prompt]['responses'].extend(responses)
        data[prompt]['sft_target'] = chosen

In [2]:
import re

def is_high_quality_text(text):
    # 1. 检查每个步骤是否以 [Step X] 的形式表示，并且按顺序递增
    steps = re.findall(r'\[Step (\d+)\]', text)
    if not steps:
        return False  # 没有找到任何步骤标记

    # 转换步骤编号为整数，检查是否顺序递增
    steps = list(map(int, steps))
    if steps != list(range(1, len(steps) + 1)):
        return False  # 步骤编号不是从1开始的递增序列

    # 2. 检查最后一行是否包含 "So, the numerical answer is: [数字]" 格式
    answer_match = re.search(r"So, the numerical answer is:\s*\[(\d+)\]\s*$", text.strip())
    if not answer_match:
        return False  # 没有找到符合要求的答案部分

    # 满足所有条件
    return True

# 示例用法
text = """
[Step 1]: First, we find out how much money Betty has initially. Since she has half of the $100 wallet, she has:

\[ \text{Betty's initial money} = \frac{100}{2} = 50 \]

[Step 2]: 
Next, we determine how much money her grandparents gave her. They gave her twice as much as her parents, who gave her $15. So, the grandparents gave:

\[ \text{Grandparents' money} = 2 \times 15 = 30 \]

[Step 3]:
Now, we calculate the total amount of money Betty has after receiving the money from her parents and grandparents:

\[ \text{Total money Betty has} = \text{Betty's initial money} + \text{Parents' money} + \text{Grandparents' money} \]
\[ \text{Total money Betty has} = 50 + 15 + 30 = 95 \]

[Step 4]:
Finally, we find out how much more money Betty needs to buy the wallet by subtracting the total money she has from the cost of the wallet:

\[ \text{Money needed} = \text{Wallet cost} - \text{Total money Betty has} \]
\[ \text{Money needed} = 100 - 95 = 5 \]

So, the numerical answer is: [5]
"""

print(is_high_quality_text(text))  # 输出 True 表示文本为高质量


True


  text = """
