# Self-Consistency Test 

## Load environment

In [3]:
import os
import re
import time
from collections import Counter

import dashscope
from dotenv import load_dotenv

###############################################################################
# 1) Load environment (DASHSCOPE_API_KEY)
###############################################################################

load_dotenv("dashscope_api_key.env")
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
    print("❌ DASHSCOPE_API_KEY not found!")

## Define call_model function

In [None]:
###############################################################################
# 2) Single call to Qwen via DashScope
###############################################################################
def call_model(
    prompt: str,
    model_name: str = "deepseek-r1-distill-qwen-1.5b",  # use a valid DashScope model name
    system_prompt: str = (
        "You are a helpful assistant. "
        "Please show your reasoning step by step (Chain of Thought). "
        "Then, on a new line at the end, write:\n"
        "'Final Answer: <the numeric result>'"
    ),
    temperature: float = 0.7,
    top_p: float = 0.9,
    max_retries: int = 3,
    retry_delay: float = 0.3
) -> str:
    """
    Call Qwen via DashScope with minimal error handling and an explicit prompt
    that instructs the model to produce a final line 'Final Answer: ...'.
    """
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user",   "content": prompt}
    ]

    for attempt in range(max_retries):
        response = dashscope.Generation.call(
            api_key=api_key,
            model=model_name,
            messages=messages,
            temperature=temperature,
            top_p=top_p,
            result_format="message"
        )

        status_code = response.get("status_code", None)
        if status_code == 429:
            print(f"⏳ [Attempt {attempt+1}/{max_retries}] Rate limit! Wait {retry_delay}s...")
            time.sleep(retry_delay)
            continue

        try:
            # Extract the content and token usage from the response
            content = response["output"]["choices"][0]["message"]["content"]
            usage = response.get("usage", {})
            return content, usage 
        
        except (TypeError, KeyError, IndexError):
            print(f"⚠️ [Attempt {attempt+1}/{max_retries}] Unexpected structure:\n{response}")
            time.sleep(retry_delay)

    print(f"❌ All {max_retries} attempts failed, returning empty.")
    return ""


## Extract answer from model response

In [6]:
###############################################################################
# 3) Extract "Final Answer: x"
###############################################################################

def extract_final_answer(response_text: str) -> str:
    """
    Extract the final numeric answer from the model response.
    """
    
    # 1) Try \boxed{...} with possible nested braces
    box_match = re.search(r"\\boxed\{([^}]*)\}", response_text, flags=re.DOTALL)
    if box_match:
        box_content = box_match.group(1)
        # Grab the first number from within the box
        numbers_in_box = re.findall(r"\d+(?:\.\d+)?", box_content)
        if numbers_in_box:
            return numbers_in_box[0].strip()
        # If no number in the box, just return the raw content
        return box_content.strip()

    # 2) Try "Final Answer: ..."
    fa_match = re.search(r"(?i)final answer\s*:\s*([^\n]*)", response_text)
    if fa_match:
        return fa_match.group(1).strip()

    # 3) Fallback: last number in the text
    numbers = re.findall(r"\d+(?:\.\d+)?", response_text)
    if numbers:
        return numbers[-1]

    # 4) Final fallback: return entire string
    return response_text.strip()


##  multiple calls, vote

In [None]:
def self_consistency_inference(
    question_text: str,
    model_name: str = "deepseek-r1-distill-qwen-1.5b",
    num_samples: int = 5,
    temperature: float = 0.7,
    top_p: float = 0.9
) -> tuple[str, str, float, dict]:
    """
    Returns:
        - best_answer (str): Most frequently voted final answer
        - best_response (str): Full model response corresponding to best_answer
        - confidence (float): vote_ratio = max_votes / num_samples
        - usage (dict): prompt_tokens, completion_tokens, total_tokens from the selected response
    """
    from collections import Counter

    sample_data = []

    for i in range(num_samples):
        response_text, usage = call_model(
            prompt=question_text,
            model_name=model_name,
            temperature=temperature,
            top_p=top_p
        )
        final_ans = extract_final_answer(response_text)

        sample_data.append({
            "answer": final_ans,
            "response": response_text,
            "usage": usage  # full usage dict from API
        })

        print(f"[Sample {i+1}] Raw Final Answer: {final_ans}")

    # Vote for the most common final answer
    counter = Counter(s["answer"] for s in sample_data)
    best_answer, best_vote_count = counter.most_common(1)[0]
    confidence = best_vote_count / num_samples

    # Get the first response that produced the best_answer
    for s in sample_data:
        if s["answer"] == best_answer:
            best_response = s["response"]
            usage = s["usage"]
            break

    return best_answer, best_response, confidence, usage

## Single Demo Question Test

In [None]:
###############################################################################
# 5) Demo: Single Question Test
###############################################################################
if __name__ == "__main__":
    # Example question
    # question_text = "If x = 3, what is the value of (x + 2)^2?"
    question_text = "Janet’s ducks lay 16 eggs per day. She eats three for breakfast every morning and bakes muffins for her friends every day with four. She sells the remainder at the farmers' market daily for $2 per fresh duck egg. How much in dollars does she make every day at the farmers' market?"

    # Gold answer
    gold_answer = "18"

    # Do self-consistency
    predicted, final_response, confidence, usage = self_consistency_inference(
        question_text=question_text,
        model_name="deepseek-r1-distill-qwen-1.5b",  # or 'qwen2-math-1.5b-instruct' if valid
        num_samples=5,          # how many times to sample
        temperature=0.7,
        top_p=0.9
    )

    # Compare with gold
    is_correct = (predicted.strip() == gold_answer.strip())

    response_length = len(final_response)

    
    print("=== Single Question Test ===")
    print("Question:", question_text)
    print("Predicted Final Answer:", predicted)
    print("Gold Answer:", gold_answer)
    print("Correct?" , is_correct)
    print(f"Response Length (chars): {response_length}")

[Sample 1] Raw Final Answer: 18
[Sample 2] Raw Final Answer: 18
[Sample 3] Raw Final Answer: 18
[Sample 4] Raw Final Answer: 18
[Sample 5] Raw Final Answer: 18
=== Single Question Test ===
Question: Janet’s ducks lay 16 eggs per day. She eats three for breakfast every morning and bakes muffins for her friends every day with four. She sells the remainder at the farmers' market daily for $2 per fresh duck egg. How much in dollars does she make every day at the farmers' market?
Predicted Final Answer: 18
Gold Answer: 18
Correct? True
Response Length (chars): 1010


## GSM8K main_test 

In [None]:
import os
import re
import pandas as pd
import csv

# =============================
# Helper functions
# =============================
def extract_numeric(ans: str) -> str:
    """Clean extracted answer by removing $, units, commas, and extra tokens."""
    ans = ans.strip()
    ans = re.sub(r"[^0-9.\-]", "", ans)
    return ans

def extract_gold_answer(gold_text: str) -> str:
    """Extract the numeric answer from GSM8K-style '#### <number>' line."""
    match = re.search(r"####\s*(\d+(?:\.\d+)?)", str(gold_text))
    if match:
        return match.group(1)
    numbers = re.findall(r"\d+(?:\.\d+)?", str(gold_text))
    if numbers:
        return numbers[-1]
    return str(gold_text).strip()

def is_correct_with_tolerance(pred: str, gold: str, epsilon: float = 1e-5) -> bool:
    """Compare pred and gold within a tolerable margin of error."""
    try:
        return abs(float(pred) - float(gold)) < epsilon
    except ValueError:
        return pred.strip() == gold.strip()
    
def is_abnormal_answer(ans: str, max_repeat: int = 10, max_len: int = 100) -> bool:
    ans = ans.strip()

    # Empty or overly long numeric string
    if len(ans) > max_len:
        return True

    # Check for repeated characters like 777777...
    if re.match(r"^(\d)\1{" + str(max_repeat) + r",}$", ans):
        return True

    return False


# =============================
# Load GSM8K CSV Dataset
# =============================
df = pd.read_csv("dataset/GSM8K/main_test.csv")
num_samples = -1  # -1 for full dataset
subset = df if num_samples == -1 else df.head(num_samples)

# =============================
# Models to Evaluate
# =============================
models_to_test = [
    "deepseek-r1-distill-qwen-1.5b"
]

temperature = 0.7
top_p = 0.9
samples_per_question = 5

# =============================
# Main Evaluation Loop
# =============================
for model_name in models_to_test:
    print(f"\n\n==================== Evaluating Model: {model_name} ====================")

    model_short_names = {
        "deepseek-r1-distill-qwen-1.5b": "deepseek"
    }

    dataset_name = "gsm8k"
    short_name = model_short_names.get(model_name, model_name.replace("/", "_"))
    save_path = f"results/zero_shot_{dataset_name}_{short_name}.csv"

    # Resume: Load existing result file
    done_indices = set()
    if os.path.exists(save_path):
        try:
            existing_df = pd.read_csv(save_path)
            done_indices = set(existing_df["index"].tolist())
            print(f"🔁 Resuming from existing result file: {save_path}")
        except Exception as e:
            print(f"⚠️ Failed to read existing result file: {e}")

    for idx, row in subset.iterrows():
        if idx in done_indices:
            if idx == len(done_indices)-1:
                print(f"⏩ Skipping already completed index {idx}")
            continue
        # print(f"⏩ Skipping already completed index {idx-1}")
        question = row["question"]
        gold_answer = row["answer"]

        print(f"\n=== Question {idx} ===")
        print("Question:", question)

        try:
            pred, full_response, confidence, usage = self_consistency_inference(
                question_text=question,
                model_name=model_name,
                num_samples=samples_per_question,
                temperature=temperature,
                top_p=top_p
            )
        except Exception as e:
            print(f"[Error] {e}")
            pred = ""
            full_response = ""
            confidence = 0.0
            usage = {}

        gold_clean = extract_gold_answer(gold_answer)

        pred = extract_numeric(pred)
        # Detect abnormal numeric patterns
        if is_abnormal_answer(pred):
            print("⚠️ Abnormal pattern detected! Marking as invalid.")
            pred = "INVALID" # Mark as invalid
            correct = False

        correct = is_correct_with_tolerance(pred, gold_clean)
        response_length = len(full_response)

        print("Gold Extracted:", repr(gold_clean))
        print("Predicted Final Answer:", repr(pred))
        print("Matched Correctly?", correct)

        # Extract token info from usage
        completion_tokens = usage.get("output_tokens", usage.get("completion_tokens", 0))
        prompt_tokens = usage.get("input_tokens", usage.get("prompt_tokens", 0))
        total_tokens = usage.get("total_tokens", 0)

        result_row = {
            "index": idx,
            "gold_clean": gold_clean,
            "predicted_answer": pred,
            "correct": correct,
            "response_length": response_length,
            "confidence": confidence,
            "completion_tokens": completion_tokens,
            "prompt_tokens": prompt_tokens,
            "total_tokens": total_tokens,
        }

        # Append to CSV immediately
        write_header = not os.path.exists(save_path)
        with open(save_path, mode='a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=result_row.keys())
            if write_header:
                writer.writeheader()
            writer.writerow(result_row)

        print(f"✅ Saved result for index {idx}")




🔁 Resuming from existing result file: results/results_gsm8k_qwen.csv
⏩ Skipping already completed index 1318



=== Question 0 ===
Question: Janet’s ducks lay 16 eggs per day. She eats three for breakfast every morning and bakes muffins for her friends every day with four. She sells the remainder at the farmers' market daily for $2 per fresh duck egg. How much in dollars does she make every day at the farmers' market?


## AIME_1983_2024

In [None]:
# =============================
# Load AIME CSV Dataset
# =============================
df = pd.read_csv("dataset/AIME_Dataset_1983_2024.csv")  # 👈 这里换成你的 AIME 数据路径
num_samples = -1
subset = df if num_samples == -1 else df.head(num_samples)

# =============================
# Models to Evaluate
# =============================
models_to_test = [
    "deepseek-r1-distill-qwen-1.5b"
]

temperature = 0.7
top_p = 0.9
samples_per_question = 5

# =============================
# Main Evaluation Loop
# =============================
for model_name in models_to_test:
    print(f"\n\n==================== Evaluating Model: {model_name} ====================")

    model_short_names = {
        "deepseek-r1-distill-qwen-1.5b": "deepseek"
    }

    dataset_name = "aime"
    short_name = model_short_names.get(model_name, model_name.replace("/", "_"))
    save_path = f"results/zero_shot_{dataset_name}_{short_name}.csv"

    # Resume
    done_indices = set()
    if os.path.exists(save_path):
        try:
            existing_df = pd.read_csv(save_path)
            done_indices = set(existing_df["index"].tolist())
            print(f"🔁 Resuming from existing result file: {save_path}")
        except Exception as e:
            print(f"⚠️ Failed to read existing result file: {e}")

    for idx, row in subset.iterrows():
        if idx in done_indices:
            if idx == len(done_indices)-1:
                print(f"⏩ Skipping already completed index {idx}")
            continue

        question = row["Question"]
        gold_answer = row["Answer"]

        print(f"\n=== Question {idx} ===")
        print("Question:", question)

        try:
            pred, full_response, confidence, usage = self_consistency_inference(
                question_text=question,
                model_name=model_name,
                num_samples=samples_per_question,
                temperature=temperature,
                top_p=top_p
            )
        except Exception as e:
            print(f"[Error] {e}")
            pred = ""
            full_response = ""
            confidence = 0.0
            usage = {}

        gold_clean = extract_gold_answer(gold_answer)
        pred = extract_numeric(pred)

        if is_abnormal_answer(pred):
            print("⚠️ Abnormal pattern detected! Marking as invalid.")
            pred = "INVALID"
            correct = False
        else:
            correct = is_correct_with_tolerance(pred, gold_clean)

        response_length = len(full_response)

        # Token usage
        completion_tokens = usage.get("output_tokens", usage.get("completion_tokens", 0))
        prompt_tokens = usage.get("input_tokens", usage.get("prompt_tokens", 0))
        total_tokens = usage.get("total_tokens", 0)

        print("Gold Extracted:", repr(gold_clean))
        print("Predicted Final Answer:", repr(pred))
        print("Matched Correctly?", correct)

        result_row = {
            "index": idx,
            "id": row["ID"],
            "year": row["Year"],
            "problem_number": row["Problem Number"],
            "gold_clean": gold_clean,
            "predicted_answer": pred,
            "correct": correct,
            "response_length": response_length,
            "confidence": confidence,
            "completion_tokens": completion_tokens,
            "prompt_tokens": prompt_tokens,
            "total_tokens": total_tokens,
        }

        # Save row immediately
        write_header = not os.path.exists(save_path)
        with open(save_path, mode='a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=result_row.keys())
            if write_header:
                writer.writeheader()
            writer.writerow(result_row)

        print(f"✅ Saved result for index {idx}")


## MATH-500

## Single Index Test

In [None]:
import pandas as pd

###############################################################################
# 1) Load dataset
###############################################################################
df = pd.read_csv("dataset/GSM8K/main_test.csv")

target_index = 154

sample = df.iloc[target_index]
question_text = sample["question"]
gold_answer_raw = sample["answer"]

###############################################################################
# 2) Run self-consistency on this question
###############################################################################
predicted, final_response, confidence, usage = self_consistency_inference(
    question_text=question_text,
    model_name="deepseek-r1-distill-qwen-1.5b",
    num_samples=5,
    temperature=0.7,
    top_p=0.9
)

###############################################################################
# 3) Evaluate
###############################################################################
gold_clean = extract_gold_answer(gold_answer_raw)
pred_clean = extract_numeric(predicted)
is_correct = (pred_clean == gold_clean)
response_length = len(final_response)

###############################################################################
# 4) Print results
###############################################################################
print("\n=== Single Question Test ===")
print("Index:", target_index)
print("Gold Raw:", repr(gold_answer_raw))
print("Question:", question_text)
print("Predicted Final Answer:", repr(pred_clean))
print("Gold Answer:", repr(gold_clean))
print("Correct?:", is_correct)
print(f"Response Text: {final_response}")
print(f"Response Length: {response_length} chars")
print(f"Confidence: {confidence:.2f}")
print("Token Usage:", usage)


In [None]:
import pandas as pd

###############################################################################
# 1) Load AIME Dataset
###############################################################################
df = pd.read_csv("dataset/AIME_Dataset_1983_2024.csv") 

target_index = 154  # 👈 要测试哪一道题，改这里

sample = df.iloc[target_index]
question_text = sample["Question"]
gold_answer_raw = sample["Answer"]

###############################################################################
# 2) Run self-consistency inference on this question
###############################################################################
predicted, full_response, confidence, usage = self_consistency_inference(
    question_text=question_text,
    model_name="deepseek-r1-distill-qwen-1.5b", 
    num_samples=5,
    temperature=0.7,
    top_p=0.9
)

###############################################################################
# 3) Evaluate Answer
###############################################################################
gold_clean = extract_gold_answer(gold_answer_raw)
pred_clean = extract_numeric(predicted)
is_correct = is_correct_with_tolerance(pred_clean, gold_clean)
response_length = len(full_response)

###############################################################################
# 4) Print Results
###############################################################################
print("\n=== AIME Single Question Test ===")
print("Index:", target_index)
print("Year:", sample.get("Year", "N/A"))
print("Problem Number:", sample.get("Problem Number", "N/A"))
print("Question:", question_text)
print("Gold Raw:", repr(gold_answer_raw))
print("Gold Cleaned:", repr(gold_clean))
print("Predicted Answer:", repr(pred_clean))
print("Correct?:", is_correct)
print("Confidence:", f"{confidence:.2f}")
print("Response Length:", response_length)
print("Token Usage:", usage)
print("\n=== Full Model Response ===\n")
print(full_response)


[Sample 1] Raw Final Answer: 2051
[Sample 2] Raw Final Answer: 2051
[Sample 3] Raw Final Answer: 2051
[Sample 4] Raw Final Answer: 2051
[Sample 5] Raw Final Answer: 2051

=== AIME Single Question Test ===
Index: 154
Year: 1995
Problem Number: 3
Question: Starting at $(0,0),$ an object moves in the coordinate plane via a sequence of steps, each of length one.  Each step is left, right, up, or down, all four equally likely.  Let $p$ be the probability that the object reaches $(2,2)$ in six or fewer steps.  Given that $p$ can be written in the form $m/n,$ where $m$ and $n$ are relatively prime positive integers, find $m+n.$
Gold Raw: '67'
Gold Cleaned: '67'
Predicted Answer: '2051'
Correct?: False
Confidence: 1.00
Response Length: 2211
Token Usage: {"input_tokens": 153, "output_tokens": 672, "total_tokens": 825, "cached_tokens": 0}

=== Full Model Response ===

To determine the probability \( p \) that an object starting at \((0,0)\) reaches \((2,2)\) in six or fewer steps, we need to ana