In [6]:
# Load MathQA dataset
from datasets import load_dataset
math_qa = load_dataset("allenai/math_qa")

# Print basic dataset information
print(math_qa)

# Examine dataset splits and sizes
for split in math_qa:
    print(f"{split} split: {len(math_qa[split])} examples")

# Look at the structure of a single example
print("\nExample structure:")
print(math_qa["train"][1])


DatasetDict({
    train: Dataset({
        features: ['Problem', 'Rationale', 'options', 'correct', 'annotated_formula', 'linear_formula', 'category'],
        num_rows: 29837
    })
    test: Dataset({
        features: ['Problem', 'Rationale', 'options', 'correct', 'annotated_formula', 'linear_formula', 'category'],
        num_rows: 2985
    })
    validation: Dataset({
        features: ['Problem', 'Rationale', 'options', 'correct', 'annotated_formula', 'linear_formula', 'category'],
        num_rows: 4475
    })
})
train split: 29837 examples
test split: 2985 examples
validation split: 4475 examples

Example structure:
{'Problem': 'average age of students of an adult school is 40 years . 120 new students whose average age is 32 years joined the school . as a result the average age is decreased by 4 years . find the number of students of the school after joining of the new students .', 'Rationale': '"explanation : let the original no . of students be x . according to situation , 40

In [5]:
# Load MathQA dataset
from datasets import load_dataset
import pandas as pd

# Load the dataset
math_qa = load_dataset("allenai/math_qa")

# Convert to DataFrame for easier filtering
train_df = pd.DataFrame(math_qa["train"])
test_df = pd.DataFrame(math_qa["test"])
dev_df = pd.DataFrame(math_qa["validation"])

# Filter probability problems from each split
train_probability = train_df[train_df["category"] == "probability"]
test_probability = test_df[test_df["category"] == "probability"]
dev_probability = dev_df[dev_df["category"] == "probability"]

# Print statistics
print(f"Train set: {len(train_probability)} probability problems")
print(f"Test set: {len(test_probability)} probability problems")
print(f"Validation set: {len(dev_probability)} probability problems")
print(f"Total: {len(train_probability) + len(test_probability) + len(dev_probability)} probability problems")

# Look at a sample problem
print("\nSample probability problem:")
print(train_probability.iloc[0][["Problem", "Rationale", "correct"]])

# Save to CSV if needed
train_probability.to_csv("data/train_probability_problems.csv", index=False)
test_probability.to_csv("data/test_probability_problems.csv", index=False)
dev_probability.to_csv("data/dev_probability_problems.csv", index=False)

print("\nProbability problems saved to CSV files")

Train set: 450 probability problems
Test set: 52 probability problems
Validation set: 74 probability problems
Total: 576 probability problems

Sample probability problem:
Problem      a gardener is going to plant 2 red rosebushes ...
Rationale    we are asked to find the probability of one pa...
correct                                                      b
Name: 89, dtype: object

Probability problems saved to CSV files


In [10]:
from datasets import load_dataset, concatenate_datasets

# 加载完整数据集并合并所有拆分
dataset = load_dataset("math_qa")
# 合并所有拆分
full_dataset = concatenate_datasets([dataset["train"], dataset["validation"], dataset["test"]])
# 只选择概率类别
prob_dataset = full_dataset.filter(lambda x: x["category"] == "probability")


Filter: 100%|██████████| 37297/37297 [00:00<00:00, 197819.11 examples/s]


In [20]:
reasoning_start = "<start_working_out>"
reasoning_end   = "<end_working_out>"
solution_start = "<SOLUTION>"
solution_end = "</SOLUTION>"

system_prompt = \
f"""You are given a multiple-choice problem with options labeled a, b, c, d, or e.
Think about the problem and provide your working out step by step.
Place your reasoning between {reasoning_start} and {reasoning_end}.
Then, provide your answer as a SINGLE LETTER corresponding to the correct option (a, b, c, d, or e) between {solution_start} and {solution_end}.
Do not include any other text in your solution - just the letter of the correct option."""

# 准备数据
def prepare_data(example):
    # 组合问题和选项作为输入问题
    question = f"{example['Problem']}\n\nOptions: {example['options']}"
    
    # 从correct字段获取答案
    answer = example['correct']
    
    # 创建GRPO训练所需的prompt格式
    return {
        "prompt": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": question}
        ],
        "answer": answer
    }

# 应用转换
prepared_dataset = prob_dataset.map(prepare_data)

Map: 100%|██████████| 576/576 [00:00<00:00, 3231.38 examples/s]


In [13]:
prepared_dataset[1]

{'Problem': 'in a throw of dice what is the probability of ge æ « ng number greater than 3',
 'Rationale': '"explanation : number greater than 3 is 4 , 5 , 6 , so only 3 number total cases of dice = [ 1,2 , 3,4 , 5,6 ] so probability = 3 / 6 = 1 / 2 answer : a"',
 'options': 'a ) 1 / 2 , b ) 1 / 3 , c ) 1 / 5 , d ) 1 / 6 , e ) none of these',
 'correct': 'a',
 'annotated_formula': 'divide(subtract(const_6, 3), const_6)',
 'linear_formula': 'subtract(const_6,n0)|divide(#0,const_6)|',
 'category': 'probability',
 'prompt': [{'content': 'You are given a problem.\nThink about the problem and provide your working out.\nPlace it between <start_working_out> and <end_working_out>.\nThen, provide your solution between <SOLUTION></SOLUTION>',
   'role': 'system'},
  {'content': 'in a throw of dice what is the probability of ge æ « ng number greater than 3\n\nOptions: a ) 1 / 2 , b ) 1 / 3 , c ) 1 / 5 , d ) 1 / 6 , e ) none of these',
   'role': 'user'}],
 'answer': 'a'}

In [14]:
import re

match_format = re.compile(
    rf"^[\s]{{0,}}"\
    rf"{reasoning_start}.+?{reasoning_end}.*?"\
    rf"{solution_start}(.+?){solution_end}"\
    rf"[\s]{{0,}}$",
    flags = re.MULTILINE | re.DOTALL
)

In [16]:
match_format.search(
    "<start_working_out>Let me think!<end_working_out>"\
    "<SOLUTION>c</SOLUTION>",
)

<re.Match object; span=(0, 71), match='<start_working_out>Let me think!<end_working_out>>

In [17]:
def match_format_exactly(completions, **kwargs):
    scores = []
    for completion in completions:
        score = 0
        response = completion[0]["content"]
        # Match if format is seen exactly!
        if match_format.search(response) is not None: score += 3.0
        scores.append(score)
    return scores

In [18]:
def match_format_approximately(completions, **kwargs):
    scores = []
    for completion in completions:
        score = 0
        response = completion[0]["content"]
        # Count how many keywords are seen - we penalize if too many!
        # If we see 1, then plus some points!
        score += 0.5 if response.count(reasoning_start) == 1 else -0.5
        score += 0.5 if response.count(reasoning_end)   == 1 else -0.5
        score += 0.5 if response.count(solution_start)  == 1 else -0.5
        score += 0.5 if response.count(solution_end)    == 1 else -0.5
        scores.append(score)
    return scores

In [19]:
def check_option(prompts, completions, answer, **kwargs):
    question = prompts[0][-1]["content"]
    responses = [completion[0]["content"] for completion in completions]

    # 正则表达式匹配选项字母
    match_option = re.compile(
        rf"{solution_start}.*?([a-e]).*?{solution_end}",
        flags = re.MULTILINE | re.DOTALL
    )

    extracted_responses = [
        option.group(1).lower()
        if (option := match_option.search(r)) is not None else None
        for r in responses
    ]

    scores = []
    print('*'*20, f"Question:\n{question}", f"\nAnswer:\n{answer[0]}", 
          f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}")
    
    for guess, true_answer in zip(extracted_responses, answer):
        if guess is None:
            scores.append(-1.0)  # 没有找到选项
            continue
        # 直接字符串匹配
        scores.append(3.0 if guess.lower() == true_answer.lower() else -1.0)
    
    return scores