In [None]:
import os
import json
import re
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
from google.colab import drive

# ✅ 挂载 Google Drive
drive.mount('/content/drive')

# ✅ 只改这两个参数
start_index = 681
end_index = 690
range_tag = f"{start_index}-{end_index}"

# ✅ 自动拼接路径
BASE_PATH = "/content/drive/MyDrive/Cluster-proj"
INPUT_PATH = f"{BASE_PATH}/dataset/openai-gsm8k/train.jsonl"
OUTPUT_PATH = f"{BASE_PATH}/output/llm_steps/whole_logits/deepseek7b-math-{range_tag}.json"

# ✅ 加载数据子集
def load_dataset(path, start, end):
    subset = []
    with open(path, 'r') as f:
        for i, line in enumerate(f):
            if start <= i < end:
                subset.append(json.loads(line))
            if i >= end:
                break
    return subset

# ✅ 加载模型和 tokenizer
def load_model_and_tokenizer(model_name):
    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="cuda",
        trust_remote_code=True,
        torch_dtype=torch.float16
    ).eval()
    return model, tokenizer

# ✅ 生成回答，并保存完整 softmax 向量（用于后期分析）
def generate_with_logits(model, tokenizer, question):
    prompt = (
        f"Answer the following math question with clear, step-by-step reasoning, "
        f"but without using any explicit step tags. "
        f"The final answer should be concise — a number or a few words only.\n\n"
        f"Question: {question}\nAnswer:"
    )

    prompt = (
    f"You are a strict math tutor.\n"
    f"Answer the following math question with clear, step-by-step reasoning.\n\n"
    f"Your whole reasoning steps must be enclosed in : <steps> ... </steps>.\n\n"
    f"The final answer must be enclosed in <final_result> ... </final_result> tags.\n\n"
    f"Only use the tags: <steps>, </steps>, and <final_result>, </final_result>. Do not create or use any other tags.\n\n"
    f"Do not skip any step tags. Each tag must have a matching opening and closing pair.\n\n"
    f"If you break this rule, your answer will be considered invalid.\n\n"
    f"The final answer should be concise — a number or a few words only, not a full sentence.\n\n"
    f"Keep the entire response under 400 tokens.\n\n"
    f"Here is one example for your reference.\n\n"
    f"Example Question: Sarah has 3 packs of pencils. Each pack contains 12 pencils. "
    f"She gives away 15 pencils to her classmates. How many pencils does she have left?\n"
    f"Example Answer:\n"
    f"<steps>Sarah has 3 packs of pencils. Each pack contains 12 pencils, so 3 * 12 = 36 pencils.\n"
    f"She gives away 15 pencils to her classmates.\n"
    f"Subtract the pencils given away from the total: 36 - 15 = 21.</steps>\n"
    f"<final_result>21</final_result>\n\n"
    f"Your style should be similar to the above example style.\n"
    f"Here is your Question: {question}\nYour Answer with reasoning steps and final result:\n\n"
    )

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=4096).to(model.device)
    output = model.generate(
        **inputs,
        max_new_tokens=1000,
        return_dict_in_generate=True,
        output_scores=True,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id,
        do_sample=True,
        top_k=50,
        top_p=0.95,
        temperature=0.7
    )

    input_len = inputs['input_ids'].shape[-1]
    generated_ids = output.sequences[0][input_len:]
    raw_tokens = tokenizer.convert_ids_to_tokens(generated_ids)
    clean_tokens = [tokenizer.convert_tokens_to_string([tok]).strip() for tok in raw_tokens]

    # ✅ 保存每个 token 的完整 softmax 概率分布（向量！）
    all_softmax = [F.softmax(score[0], dim=-1).detach().cpu().numpy() for score in output.scores]

    # ✅ 保存每个 token 的被选中概率
    chosen_probs = [float(all_softmax[i][tok_id.item()]) for i, tok_id in enumerate(generated_ids)]

    # ✅ 生成 token-level 结构化信息
    token_level = []
    for i, token in enumerate(clean_tokens):
        token_entry = {
            "token": token,
            "chosen_prob": chosen_probs[i],
            "softmax": all_softmax[i].tolist()
        }
        token_level.append(token_entry)

    answer = tokenizer.decode(generated_ids, skip_special_tokens=True).strip()
    return answer, token_level

# ✅ 提取 <final_result> （如果模型有明确 final result，可以尝试匹配；否则返回 None）
def extract_final_result(text):
    match = re.search(r"<final_result>(.*?)</final_result>", text, re.DOTALL)
    return match.group(1).strip() if match else None
def extract_true_final_result(ans_str):
    match = re.search(r'####\s*(\S+)', ans_str)
    return match.group(1).strip() if match else None

# ✅ 主执行函数
def run_generation_pipeline(start_index, end_index, base_path, model_name):
    dataset = load_dataset(INPUT_PATH, start_index, end_index)
    model, tokenizer = load_model_and_tokenizer(model_name)
    results = {}

    for idx, item in enumerate(dataset):
        global_id = start_index + idx
        qid = f"q_{global_id}"
        question = item["question"]
        true_whole_answer = item.get("answer", "")
        true_final_result = extract_true_final_result(true_whole_answer)
        results[qid] = {
            "question": question,
            "true_whole_answer": true_whole_answer,
            "true_final_result": true_final_result
        }

        for i in range(3):
            try:
                full_answer, token_probs = generate_with_logits(model, tokenizer, question)
                final_result = extract_final_result(full_answer)
                results[qid][f"sampling{i}"] = {
                    "whole_answer": full_answer,
                    "token_probs": token_probs,
                    "final_result": final_result
                }
            except Exception as e:
                print(f"[ERROR] Failed on {qid} sampling{i}: {e}")
                continue

    with open(OUTPUT_PATH, "w") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    print(f"✅ Saved: {OUTPUT_PATH}")

# ✅ 执行入口
run_generation_pipeline(
    start_index=start_index,
    end_index=end_index,
    base_path=BASE_PATH,
    model_name="deepseek-ai/deepseek-llm-7b-chat"
)


Mounted at /content/drive


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.28k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/4.61M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/594 [00:00<?, ?B/s]

pytorch_model.bin.index.json:   0%|          | 0.00/22.5k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

pytorch_model-00002-of-00002.bin:   0%|          | 0.00/3.85G [00:00<?, ?B/s]

pytorch_model-00001-of-00002.bin:   0%|          | 0.00/9.97G [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/23.6k [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/181 [00:00<?, ?B/s]

✅ Saved: /content/drive/MyDrive/Cluster-proj/output/llm_steps/whole_logits/deepseek7b-math-681-690.json
