<a href="https://colab.research.google.com/github/AyoRIT/Experiments-on-Test-Time-Scaling/blob/main/TTS_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
torch.cuda.is_available()

# ENSURE ALL THE MODELS HAVE A GOOD STOPPING CONDITION

True

In [None]:
from google.colab import userdata
import huggingface_hub

hf_token = userdata.get('HF_TOKEN')
huggingface_hub.login(token=hf_token)
print("Successfully logged in to Hugging Face.")

Successfully logged in to Hugging Face.


In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer


device = "cuda" if torch.cuda.is_available() else "cpu"

# solver_name = "meta-llama/Llama-3.2-1B"
solver_name = "TIGER-Lab/general-verifier"

# Load tokenizer
solver_tokenizer = AutoTokenizer.from_pretrained(solver_name)

# Load model
solver = AutoModelForCausalLM.from_pretrained(
    solver_name,
    device_map="auto",      # automatically assigns layers to GPUs/CPU
    torch_dtype=torch.float16  # saves memory and is faster
)


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/605 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/616 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/746 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/117 [00:00<?, ?B/s]

In [None]:
from datasets import load_dataset

# Load the GSM8K dataset (train/test split)
gsm8k = load_dataset("gsm8k", "main")
train_set = gsm8k["train"]
test_set = gsm8k["test"]

print(f"Train size: {len(train_set)}, Test size: {len(test_set)}")

README.md: 0.00B [00:00, ?B/s]

main/train-00000-of-00001.parquet:   0%|          | 0.00/2.31M [00:00<?, ?B/s]

main/test-00000-of-00001.parquet:   0%|          | 0.00/419k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/7473 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1319 [00:00<?, ? examples/s]

Train size: 7473, Test size: 1319


In [None]:
def make_prompt(problem: str, few_shot: str | None = None) -> str:
    """
    Return a prompt for GSM8K-style problems that requests:
     - numbered deterministic steps "Step 1:", "Step 2:", ...
     - a final numeric answer only inside the exact markers:
         <<<FINAL_ANSWER>>>
         123
         <<<END_FINAL_ANSWER>>>
    few_shot: optional few-shot string (already formatted as steps+final markers) to prepend.
    """
    template = """
Solve the following math word problem step by step.

You MUST:
1. Write your reasoning as an ordered list exactly like "Step 1:", "Step 2:", etc.
2. Keep steps explicit and deterministic for test-time scaling evaluations.
3. Do NOT restate the problem, do NOT repeat the prompt, and do NOT print any text before Step 1.
4. At the end, output the final numeric answer inside a marker **exactly** like:


<<<FINAL_ANSWER>>>
123
<<<END_FINAL_ANSWER>>>
Rules:
- Output this block once and only once.
- Do NOT write "Final Answer:" or any other text outside this block.
- Do NOT repeat the block.
- The block must be the last thing in the response.
- The final answer must contain digits only (no commas, no currency symbols).
- If the true final answer is an integer, print it as integers (e.g. "18"). If it has decimals, use a dot (e.g. "12.5").
- After <<<END_FINAL_ANSWER>>> do not output additional analysis.

Problem:
{problem}
""".strip()
    if few_shot:
        return few_shot.strip() + "\n\n" + template.format(problem=problem)
    return template.format(problem=problem)


In [None]:
from transformers import StoppingCriteria, StoppingCriteriaList

class StopOnMarker(StoppingCriteria):
    def __init__(self, tokenizer, stop_string):
        self.stop_ids = tokenizer(stop_string)["input_ids"]

    def __call__(self, input_ids, scores, **kwargs):
        return input_ids[0][-len(self.stop_ids):].tolist() == self.stop_ids

stopping = StoppingCriteriaList([
    StopOnMarker(solver_tokenizer, "<<<END_FINAL_ANSWER>>>")
])

In [None]:
def generate_solution(model, tokenizer, prompt, device,
                      temperature=0.0,
                      top_p=1.0,
                      do_sample=False):
    # tokenize and send attention mask
    enc = tokenizer(prompt, return_tensors="pt")
    input_ids = enc.input_ids.to(device)
    attention_mask = enc.attention_mask.to(device)

    generation_kwargs = {
        "attention_mask": attention_mask,
        "temperature": temperature,
        "top_p": top_p,
        "do_sample": do_sample,
        # ensure model has a pad token id (common workaround: set pad to eos)
        "pad_token_id": getattr(tokenizer, "pad_token_id", None) or tokenizer.eos_token_id,
        # explicit eos token is fine, but max_new_tokens is the main control
        "eos_token_id": tokenizer.eos_token_id,
        "max_new_tokens": 1024,
        "stopping_criteria": stopping
    }

    output_ids = model.generate(input_ids, **generation_kwargs)

    # slice off the input prefix so decode returns only the model continuation
    gen_ids = output_ids[0, input_ids.shape[-1]:]
    generated_text = tokenizer.decode(gen_ids, skip_special_tokens=True)
    return generated_text


In [None]:
import re
from typing import List, Tuple, Optional

def extract_final_answer(text: str) -> Optional[str]:
    """
    Find the numeric final answer. Prefer the exact markers, then fallback to common patterns.
    Returns the numeric string (digits possibly with decimal point) or None.
    """
    # 1) exact marker
    m = re.search(r'<<<FINAL_ANSWER>>>[\s\n]*([0-9]+(?:\.[0-9]+)?)\s*<<<END_FINAL_ANSWER>>>', text)
    if m:
        return m.group(1)

    # 2) inline "Final Answer:" or "Final Answer ="
    m = re.search(r'Final\s*Answer[:=]*\s*\$?([0-9]+(?:\.[0-9]+)?)', text, re.I)
    if m:
        return m.group(1)

    # 3) last number in the text as a last resort (less reliable)
    nums = re.findall(r'([0-9]+(?:\.[0-9]+)?)', text)
    if nums:
        return nums[-1]
    return None

def split_steps(text: str) -> Tuple[List[str], Optional[str]]:
    """
    Return (steps_list, final_answer_str_or_None).
    steps_list elements are strings like "Step 1: <text of step>" cleaned with .strip().
    This function tries to ignore prompt re-statements and only start from the first "Step 1:" if present.
    """
    # normalize line endings
    text = text.replace("\r\n", "\n").strip()

    # If model included the whole problem or earlier prompt, find the first Step 1
    m0 = re.search(r'(Step\s*1\s*:)', text, re.I)
    if m0:
        text = text[m0.start():]  # start at Step 1

    # find all "Step N:" positions
    matches = list(re.finditer(r'(Step\s*\d+\s*:)', text, re.I))
    steps = []
    if matches:
        for i, m in enumerate(matches):
            start = m.end()
            end = matches[i+1].start() if i+1 < len(matches) else None
            content = text[start:end].strip()
            header = m.group().strip()
            steps.append(f"{header} {content}".strip())
    else:
        # If the model didn't use "Step N:", try to split by lines that look like numbered bullets.
        lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
        steps = lines

    final_answer = extract_final_answer(text)
    return steps, final_answer


In [None]:
# Helper functions to extract answer from question and compare predicted answer to actual answer
def extract_gsm8k_answer(gsm8k_answer):
    """
    Extract the final answer from GSM8K formatted solution.
    """
    match = re.search(r"####\s*(.*)", gsm8k_answer)
    if match:
        return match.group(1).strip()
    else:
        # fallback: use last line
        return gsm8k_answer.strip().split("\n")[-1]

def is_correct(prediction, reference):
    """
    Compare solver prediction to reference answer.
    """
    # Normalize both answers: remove whitespace, punctuation
    def normalize(ans):
        ans = ans.strip().lower()
        ans = re.sub(r"[^\w\d]", "", ans)  # remove punctuation
        return ans
    return normalize(prediction) == normalize(reference)

In [None]:
from transformers import pipeline

# Load the PRM step classifier
prm_pipe = pipeline(
    "token-classification",
    model="HuggingFaceH4/Qwen2.5-Math-1.5B-Instruct-PRM-0.2",
    device=0  # GPU
)


config.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/605 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/613 [00:00<?, ?B/s]

Device set to use cuda:0


In [None]:
separator = "\n\n"

def prm_score(prompt, steps):
    total = 0.0
    for i in range(1, len(steps) + 1):
        text = separator.join([prompt] + steps[:i]) + separator
        pred = prm_pipe(text)[-1]   # last step classification
        score = 1.0 if pred["entity"] == "LABEL_1" else 0.0
        total += score
    return total


In [None]:
""" This is a version of best_of_n that returns all the solution,
 not just the best one. this allows us to carry out both majority
 voting and PRM scoring on the same set of generated responses
"""

def best_of_n_pipeline(example, solver, tokenizer, prm_pipe, n=5, device="cuda"):
    """
    Generates N candidate solutions with step extraction & PRM scoring.
    Does NOT select the best — returns all candidates for voting or analysis.
    """

    question = example["question"]
    reference_answer = extract_gsm8k_answer(example["answer"])
    prompt = make_prompt(question)

    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)

    outputs = solver.generate(
        input_ids,
        temperature=0.7,
        top_p=0.9,
        do_sample=True,
        num_return_sequences=n,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id,
        max_new_tokens=1024,
        stopping_criteria=stopping
    )

    results = []  # <-- storing all attempts

    for output_ids in outputs:
        # isolate continuation
        generated_ids = output_ids[input_ids.shape[1]:]
        continuation = tokenizer.decode(generated_ids, skip_special_tokens=True)

        steps, pred_answer = split_steps(continuation)
        score = prm_score(prompt, steps)

        results.append({
            "steps": steps,
            "predicted_answer": pred_answer,
            "prm_score": score
            # "raw_continuation": continuation
        })

    return {
        # "question_id": example.get("id", None),
        "question": question,
        "reference_answer": reference_answer,
        "candidates": results
    }



In [None]:
"""
This function allows me to vote over the samples, implementing a majority voting pipeline.
"""
def vote_over_samples(candidates, reference_answer, return_counts=False):
    """
    Majority voting over predicted answers from candidate solutions.

    Returns format:
    {
      "predicted_answer": str,
      "is_correct": bool,
      (optional) "counts": dict
    }
    """
    from collections import Counter

    answers = [c["predicted_answer"] for c in candidates if c.get("predicted_answer") is not None]
    if not answers:
        return None

    counts = Counter(answers)
    majority_answer, _ = counts.most_common(1)[0]

    result = {
        "predicted_answer": majority_answer,
        "is_correct": is_correct(majority_answer, reference_answer)
    }

    if return_counts:
        result["counts"] = dict(counts)

    return result



In [None]:
def select_best_of_n(result_dict):
    """
    Given the output from best_of_n_pipeline (which contains candidates),
    return the single best solution based on PRM score.

    result_dict format:
    {
      "question_id": ...,
      "question": ...,
      "reference_answer": ...,
      "candidates": [...]
    }
    """

    best = max(result_dict["candidates"], key=lambda x: x["prm_score"])

    return {
        # "question_id": result_dict["question_id"],
        "question": result_dict["question"],
        "reference_answer": result_dict["reference_answer"],

        # "best_solution": best["raw_continuation"],
        "best_predicted_answer": best["predicted_answer"],
        "prm_score": best["prm_score"],
        "best_steps": best["steps"],

        "is_correct": is_correct(best["predicted_answer"], result_dict["reference_answer"])
    }


In [None]:

def beam_search_pipeline(example, solver, tokenizer, num_beams=5, device="cuda"):
    """
    Runs beam search decoding on a GSM8K example without PRM.

    Args:
        example (dict): GSM8K example with "question" and "answer".
        solver: LLM model.
        tokenizer: Corresponding tokenizer.
        num_beams (int): Number of beams to keep.
        device (str): Device for generation.

    Returns:
        dict: {
            "best_solution": str,
            "predicted_answer": str,
            "steps": List[str],
            "reference_answer": str,
            "is_correct": bool,
            "all_beams": List[str]  # decoded text for all beams
        }
    """
    question = example["question"]
    reference_answer = extract_gsm8k_answer(example["answer"])
    prompt = make_prompt(question)

    # Tokenize input
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)

    # Beam search generation
    outputs = solver.generate(
        input_ids,
        num_beams=num_beams,
        num_return_sequences=num_beams,
        stopping_criteria=stopping,
        max_new_tokens=1024,
        early_stopping=True,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=False  # deterministic beam search
    )

    # Slice off prompt and decode
    beams = [
        tokenizer.decode(o[input_ids.shape[1]:], skip_special_tokens=True)
        for o in outputs
    ]

    # Pick the top beam (first one, highest probability)
    best_solution = beams[0]
    steps, predicted_answer = split_steps(best_solution)

    return {
        "best_solution": best_solution,
        "predicted_answer": predicted_answer,
        "steps": steps,
        "reference_answer": reference_answer,
        "is_correct": is_correct(predicted_answer, reference_answer),
        "all_beams": beams
    }


In [None]:
def baseline_pipeline(example, solver, tokenizer, device="cuda"):
    """
    Generate a solution with the raw model (no TTS, no PRM) and extract the predicted answer.

    Args:
        example (dict): GSM8K example with "question" and "answer".
        solver: LLM model.
        tokenizer: Corresponding tokenizer.
        device (str): Device for model generation.

    Returns:
        dict: {
            "generated_solution": str,
            "steps": List[str],
            "predicted_answer": str,
            "reference_answer": str,
            "is_correct": bool
        }
    """
    question = example["question"]
    reference_answer = extract_gsm8k_answer(example["answer"])

    prompt = make_prompt(question)
    generated_solution = generate_solution(solver, tokenizer, prompt, device)

    steps, predicted_answer = split_steps(generated_solution)

    return {
        "generated_solution": generated_solution,
        "steps": steps,
        "predicted_answer": predicted_answer,
        "reference_answer": reference_answer,
        "is_correct": is_correct(predicted_answer, reference_answer)
    }


In [None]:
import json
import os
from typing import List, Dict


# ------------------------------
# RESET BEFORE RUN
# ------------------------------
def reset_json_file(filepath: str):
    """
    Clears the file contents by rewriting an empty JSON list.
    Creates the file if it does not exist.
    """
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    with open(filepath, "w") as f:
        json.dump([], f, indent=2)


# ------------------------------
# Helper: Append JSON object
# ------------------------------
def append_to_json_file(obj: dict, filepath: str):
    """
    Appends a single JSON object to a file. Creates the file if it doesn't exist.
    """
    if not os.path.exists(filepath):
        with open(filepath, "w") as f:
            json.dump([obj], f, indent=2)
    else:
        with open(filepath, "r+") as f:
            try:
                data = json.load(f)
            except json.JSONDecodeError:
                data = []
            data.append(obj)
            f.seek(0)
            json.dump(data, f, indent=2)
            f.truncate()


# ------------------------------
# Baseline Logging
# ------------------------------
def log_baseline(example_result: dict, save_path: str):
    obj = {
        "question": example_result.get("question", ""),
        "generated_solution": example_result["generated_solution"],
        "predicted_answer": example_result["predicted_answer"],
        "actual_answer": example_result["reference_answer"],
        "is_correct": example_result["is_correct"]
    }
    append_to_json_file(obj, save_path)


# ------------------------------
# Best-of-N Logging
# ------------------------------
def log_best_of_n(result_dict: dict, save_path: str):
    best_of_n = select_best_of_n(result_dict)
    majority = vote_over_samples(result_dict["candidates"], result_dict["reference_answer"])

    obj = {
        "question": result_dict["question"],
        "actual_answer": result_dict["reference_answer"],
        "candidates": result_dict["candidates"],
        "best_of_n": best_of_n,
        "majority_vote": majority
    }
    append_to_json_file(obj, save_path)


# ------------------------------
# Beam Search Logging
# ------------------------------
def log_beam_search(example_result: dict, save_path: str):
    obj = {
        "question": example_result.get("question", ""),
        "predicted_answer": example_result["predicted_answer"],
        "actual_answer": example_result["reference_answer"],
        "is_correct": example_result["is_correct"],
        "candidates": example_result["all_beams"]
    }
    append_to_json_file(obj, save_path)


# ------------------------------
# FILE PATHS (Drive)
# ------------------------------
BASELINE_FILE = "/content/drive/MyDrive/gsm8k_eval/baseline.json"
BEST_OF_N_FILE = "/content/drive/MyDrive/gsm8k_eval/best_of_n.json"
BEAM_FILE = "/content/drive/MyDrive/gsm8k_eval/beam_search.json"


# ------------------------------
# RESET BEFORE RUN
# ------------------------------
reset_json_file(BASELINE_FILE)
reset_json_file(BEST_OF_N_FILE)
reset_json_file(BEAM_FILE)


In [None]:
import os

# List top-level files/folders in /content/drive
print(os.listdir("/content/drive"))

# Recursively list files
for root, dirs, files in os.walk("/content/drive"):
    print("Folder:", root)
    print("Subfolders:", dirs)
    print("Files:", files)
    print("-" * 40)


['MyDrive']
Folder: /content/drive
Subfolders: ['MyDrive']
Files: []
----------------------------------------
Folder: /content/drive/MyDrive
Subfolders: ['gsm8k_eval']
Files: []
----------------------------------------
Folder: /content/drive/MyDrive/gsm8k_eval
Subfolders: []
Files: ['beam_search.json', 'baseline.json', 'best_of_n.json']
----------------------------------------


In [None]:
from datasets import load_dataset

# ------------------------------
# Load GSM8K test set
# ------------------------------
gsm8k = load_dataset("gsm8k", "main")
test_set = gsm8k["test"]

# For demo, we will use only the first 5 examples
test_subset = test_set.select(range(1))  # selects first 5 examples
test_subset = [dict(example) for example in test_subset]


# ------------------------------
# Loop over the test subset
# ------------------------------
for i, example in enumerate(test_subset, 1):
    print(f"Processing Example {i} / {len(test_subset)}")

    # ---------------- Baseline ----------------
    baseline_res = baseline_pipeline(example, solver, solver_tokenizer)
    log_baseline(baseline_res, BASELINE_FILE)
    print(f"  Baseline Predicted Answer: {baseline_res['predicted_answer']} | Correct: {baseline_res['is_correct']}")

    # ---------------- Best-of-N ----------------
    best_of_n_res = best_of_n_pipeline(example, solver, solver_tokenizer, prm_pipe, n=5)
    log_best_of_n(best_of_n_res, BEST_OF_N_FILE)

    best_sol = select_best_of_n(best_of_n_res)
    majority_sol = vote_over_samples(best_of_n_res["candidates"], best_of_n_res["reference_answer"])

    print(f"  Best-of-N PRM Predicted Answer: {best_sol['best_predicted_answer']} | Correct: {best_sol['is_correct']}")
    print(f"  Majority Vote Predicted Answer: {majority_sol['predicted_answer']} | Correct: {majority_sol['is_correct']}")

    # ---------------- Beam Search ----------------
    # Assuming you have a beam_search_pipeline returning same format as baseline_pipeline
    beam_res = beam_search_pipeline(example, solver, solver_tokenizer, num_beams=5)
    log_beam_search(beam_res, BEAM_FILE)
    print(f"  Beam Search Predicted Answer: {beam_res['predicted_answer']} | Correct: {beam_res['is_correct']}")

    print("-" * 50)


Processing Example 1 / 1
  Baseline Predicted Answer: 18 | Correct: True
  Best-of-N PRM Predicted Answer: 18 | Correct: True
  Majority Vote Predicted Answer: 18 | Correct: True
  Beam Search Predicted Answer: 18 | Correct: True
--------------------------------------------------
