In [None]:
# fingerprint_tools.py
# ------------------------------------------------------------
# Minimal, tidy utilities for RoFL-style x' init + greedy y,
# and black-box verification with simple metrics.
# NO system prompt anywhere in this file.
# You provide:
#   - (base) model, tokenizer  —— 用于生成指纹 (x', y)
#   - suspect_generate()        —— 被测模型的生成回调（HF 或 GGUF）
# ------------------------------------------------------------
from __future__ import annotations
import os, time, json, random, difflib
from typing import Callable, List, Dict, Any, Tuple, Optional
from tqdm import tqdm
import numpy as np
import torch
import torch.nn.functional as F

# =============== basics ===============

def set_seed(seed: int = 42):
    import numpy as _np
    random.seed(seed)
    _np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

def ts_path(path: str, model_name) -> str:
    ts = time.strftime("%Y%m%d-%H%M%S")
    base, ext = os.path.splitext(path)
    return f"{base}_{model_name}{ext or '.json'}"

# =============== formatting (NO system) ===============

def format_full_prompt(
    x_prime_text: str,
    prompt_style: str = "oneshot",   # 'oneshot' | 'chatml' | 'raw'
) -> str:
    """
    - oneshot:
        "user: {x'}\nassistant:"
    - chatml (Qwen-like, NO system block):
        "<|im_start|>user\n{x'}\n<|im_end|>\n<|im_start|>assistant\n"
    - raw:
        "{x'}"
    """
    if prompt_style == "chatml":
        return (
            "<|im_start|>user\n" + x_prime_text + "\n<|im_end|>\n"
            "<|im_start|>assistant\n"
        )
    if prompt_style == "raw":
        return x_prime_text
    # default: oneshot
    return f"user: {x_prime_text}\nassistant:"

# =============== x' initialization (RoFL Step 1 spirit) ===============

def _build_allowed_token_set(tokenizer) -> List[int]:
    disallow = set()
    for attr in ["bos_token_id", "eos_token_id", "pad_token_id", "unk_token_id"]:
        tid = getattr(tokenizer, attr, None)
        if tid is not None:
            disallow.add(tid)
    if hasattr(tokenizer, "all_special_ids"):
        disallow.update(tokenizer.all_special_ids)
    return [tid for tid in range(tokenizer.vocab_size) if tid not in disallow]

@torch.no_grad()
def sample_fingerprint_prompt(
    model,
    tokenizer,
    device: Optional[str] = None,
    l_random_prefix: int = 8,
    total_len: int = 64,
    k_bottom: int = 50,
) -> str:
    """
    RoFL Step 1 (简化实现):
      (1) 前 l 个 token 从 vocab(去掉 special) 均匀随机
      (2) 之后每步从“概率最低的 k 个 token”里均匀取一个，直到 total_len
    """
    model.eval()
    if device is None:
        device = next(model.parameters()).device

    allowed = _build_allowed_token_set(tokenizer)
    allowed_t = torch.tensor(allowed, device=device)

    # (1) 均匀随机前缀
    prefix_ids = random.choices(allowed, k=l_random_prefix)
    prompt_ids = torch.tensor(prefix_ids, dtype=torch.long, device=device).unsqueeze(0)

    # (2) bottom-k 扩展
    while prompt_ids.shape[1] < total_len:
        logits = model(prompt_ids).logits[:, -1, :]
        probs  = F.softmax(logits, dim=-1)

        masked = probs.clone()
        if len(allowed) < tokenizer.vocab_size:
            mask = torch.ones_like(masked, dtype=torch.bool)
            mask[:, allowed_t] = False
            masked[mask] = 1e9  # 不让不允许 token 落入 bottom-k

        _, sorted_idx = torch.sort(masked, dim=-1, descending=False)  # 概率升序
        k_eff = min(k_bottom, sorted_idx.shape[1])
        bottomk_idx = sorted_idx[:, :k_eff]
        next_id = bottomk_idx[0, random.randrange(k_eff)].view(1, 1)
        prompt_ids = torch.cat([prompt_ids, next_id.to(device)], dim=1)

    return tokenizer.decode(prompt_ids.squeeze(0).cpu(), skip_special_tokens=True)

# =============== greedy y (RoFL Step 2) ===============

@torch.no_grad()
def greedy_response_hf(
    model,
    tokenizer,
    full_prompt_text: str,
    device: Optional[str] = None,
    max_new_tokens: int = 64,
) -> str:
    """
    确定性生成（do_sample=False）。返回 continuation（不含输入）。
    """
    model.eval()
    if device is None:
        device = next(model.parameters()).device

    inputs = tokenizer(full_prompt_text, return_tensors="pt").to(device)
    input_len = inputs["input_ids"].shape[1]
    out_ids = model.generate(
        **inputs,
        do_sample=False,                 # 温度=0
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id,
    )[0]
    return tokenizer.decode(out_ids[input_len:], skip_special_tokens=True, clean_up_tokenization_spaces=False)

# =============== batch: (x', y) generation ===============

def generate_fingerprints_batch(
    model,
    model_name,# 已加载好的 base HF model
    tokenizer,                   # 对应 tokenizer
    num_pairs: int = 3,
    prompt_style: str = "oneshot",      # 'oneshot' | 'chatml' | 'raw'
    l_random_prefix: int = 8,
    total_len: int = 64,
    k_bottom: int = 50,
    max_new_tokens: int = 64,
    save_json_path: Optional[str] = "fingerprints_init.json",
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
    """
    生成 num_pairs 个 (x', y) 并（可选）保存。
    不涉及任何 system 文本。
    """
    device = next(model.parameters()).device
    pairs: List[Dict[str, Any]] = []

    for i in range(num_pairs):
        print(f"[gen] [{i+1}/{num_pairs}]")
        x_prime = sample_fingerprint_prompt(
            model, tokenizer, device=device,
            l_random_prefix=l_random_prefix, total_len=total_len, k_bottom=k_bottom
        )
        full_prompt = format_full_prompt(x_prime, prompt_style=prompt_style)
        y_resp = greedy_response_hf(model, tokenizer, full_prompt, device=device, max_new_tokens=max_new_tokens)

        print("x':", x_prime[:160].replace("\n", "\\n"))
        print("y :", y_resp[:160].replace("\n", "\\n"))

        pairs.append({
            "prompt_style": prompt_style,
            "x_prime": x_prime,
            "y_response": y_resp,
            "full_prompt_used": full_prompt,
        })

    out_path = None
    if save_json_path:
        out_path = ts_path(save_json_path, model_name)
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(pairs, f, ensure_ascii=False, indent=2)
        print(f"[save] fingerprints -> {out_path}")

    return pairs, out_path

# =============== metrics ===============

def _normalize_text(s: str) -> str:
    return " ".join(s.strip().lower().split())

def metric_prefix_match(a: str, b: str, min_len: int = 30) -> int:
    return int(_normalize_text(a)[:min_len] == _normalize_text(b)[:min_len])

def metric_lcs_ratio(a: str, b: str) -> float:
    return difflib.SequenceMatcher(None, _normalize_text(a), _normalize_text(b)).ratio()

def metric_signature_overlap(a: str, b: str, min_tok_len: int = 6) -> Tuple[int, int]:
    a_norm, b_norm = _normalize_text(a), _normalize_text(b)
    toks = [t for t in a_norm.split() if len(t) >= min_tok_len]
    hits = sum(1 for t in toks if t in b_norm)
    return hits, len(toks)


# ================== suspect wrappers (HF / GGUF) ==================

class SuspectModelHF:
    """
    Wrapper for a HuggingFace causal LM (transformers).
    We'll greedy-generate continuation from full_prompt.
    """
    def __init__(self, model_name, device=None, torch_dtype=torch.float16):
        from transformers import AutoTokenizer, AutoModelForCausalLM

        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = device

        self.tok = AutoTokenizer.from_pretrained(
            model_name,
            trust_remote_code=True,
        )
        if self.tok.pad_token is None:
            self.tok.pad_token = self.tok.eos_token

        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch_dtype,
            low_cpu_mem_usage=True,
            trust_remote_code=True,
            device_map={"": device},   # put whole model on single device
        )
        self.model.eval()

    @torch.no_grad()
    def generate_answer(self, full_prompt, max_new_tokens=128, stop_tokens=None):
        """
        full_prompt: already includes role markers if any (NO system prompt anywhere).
        returns: continuation only (greedy, do_sample=False).
        """
        inputs = self.tok(full_prompt, return_tensors="pt").to(self.device)
        input_len = inputs["input_ids"].shape[1]

        out_ids = self.model.generate(
            **inputs,
            do_sample=False,  # greedy == temperature 0
            max_new_tokens=max_new_tokens,
            pad_token_id=self.tok.eos_token_id,
            eos_token_id=self.tok.eos_token_id,
        )[0]

        new_tokens = out_ids[input_len:]
        text = self.tok.decode(
            new_tokens,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False,
        )

        # optional manual stopping on phrases
        if stop_tokens:
            for st in stop_tokens:
                cut_idx = text.find(st)
                if cut_idx != -1:
                    text = text[:cut_idx]
        return text


class SuspectModelLlamaCpp:
    """
    Wrapper for llama_cpp.Llama GGUF model.
    Assumes you already created: llm = Llama(...)
    """
    def __init__(self, llm):
        self.llm = llm

    def generate_answer(self, full_prompt, max_new_tokens=128, stop_tokens=None):
        if stop_tokens is None:
            # defaults that work for oneshot/chatml/raw
            stop_tokens = ["</s>", "user:", "assistant:", "<|im_end|>", "<|im_start|>user"]
        out = self.llm(
            full_prompt,
            max_tokens=max_new_tokens,
            temperature=0.0,  # greedy-ish
            stop=stop_tokens,
        )
        return out["choices"][0]["text"]


# ================== evaluation (no system prompt anywhere) ==================
import numpy as np

def evaluate_fingerprints(
    pairs_json_path: str,
    model_name: str,
    suspect_base_name, # 用于文件名
    suspect_model,                  # 有 .generate_answer(full_prompt, max_new_tokens, stop_tokens)
    suspect_label: str = "suspect",
    save_report_path: str | None = None,   # 如 "eval_report.json"
    min_prefix_len: int = 30,
    sig_min_tok_len: int = 6,
    use_timestamp_in_name: bool = False,   # 文件名是否带时间戳
):
    """
    保存的 summary 只包含: fingerprint, base_y, suspect_y （逐样本列表）。
    其他指标只打印，不写入文件。
    """
    # 1) load pairs
    with open(pairs_json_path, "r", encoding="utf-8") as f:
        pairs = json.load(f)
    print(f"[info] Loaded {len(pairs)} fingerprint pairs from {pairs_json_path}")

    # 打印用指标（不保存）
    prefix_hits, sim_scores, sig_hits, sig_totals = [], [], [], []

    # 读入 pairs 后（evaluate_fingerprints 里）：
    style = pairs[0].get("prompt_style", "raw")

    if style == "raw":
        stops = ["</s>", "<|im_end|>"]           # 只保留真正的特殊终止
    elif style == "chatml":
        stops = ["</s>", "<|im_end|>", "<|im_start|>user"]
    else:  # oneshot
        stops = ["</s>", "user:", "assistant:"]
    # 最终要写入文件的极简列表
    minimal_records = []

    # 2) eval loop
    for idx, pair in enumerate(tqdm(pairs, desc=f"Evaluating on {suspect_label}")):
        x_prime = pair["x_prime"]              # fingerprint
        base_y  = pair["y_response"]           # base model 的 y
        # 优先使用生成 y 时精确的 full prompt（无 system/有 role，都按你生成时的格式）
        if "full_prompt_used" in pair:
            full_prompt_for_suspect = pair["full_prompt_used"]
        else:
            # 兜底（无 system、最小 role）
            full_prompt_for_suspect = f"user: {x_prime}\nassistant:"

        suspect_y = suspect_model.generate_answer(
            full_prompt_for_suspect,
            max_new_tokens=128,
            stop_tokens=stops,
        )

        # —— 只打印，不保存 ——
        pm  = metric_prefix_match(base_y, suspect_y, min_len=min_prefix_len)
        sim = metric_lcs_ratio(base_y, suspect_y)
        h, tot = metric_signature_overlap(base_y, suspect_y, min_tok_len=sig_min_tok_len)
        prefix_hits.append(pm); sim_scores.append(sim); sig_hits.append(h); sig_totals.append(tot)

        # —— 保存的极简条目 ——
        minimal_records.append({
            "fingerprint": x_prime,
            "base_y": base_y,
            "suspect_y": suspect_y,
        })

    # 3) 打印整体指标（不写入文件）
    prefix_match_rate = float(np.mean(prefix_hits)) if prefix_hits else 0.0
    avg_edit_sim      = float(np.mean(sim_scores)) if sim_scores else 0.0
    sig_overlap_rate  = (
        float(np.sum(sig_hits)) / max(1, float(np.sum(sig_totals)))
        if np.sum(sig_totals) > 0 else 0.0
    )

    print("========== FINGERPRINT VERIFICATION REPORT ==========")
    print(f"Suspect model label: {suspect_label}")
    print(f"#pairs evaluated: {len(pairs)}")
    print(f"Prefix match rate (first {min_prefix_len} chars): {prefix_match_rate:.3f}")
    print(f"Avg edit/sequence similarity (0~1):           {avg_edit_sim:.3f}")
    print(f"Signature phrase overlap (>= {sig_min_tok_len} chars): {sig_overlap_rate:.3f}")
    print("Preview (first 2 minimal records):")
    for rec in minimal_records[:2]:
        print("----")
        print("fingerprint:", rec["fingerprint"][:160].replace("\n","\\n"))
        print("base_y     :", rec["base_y"][:160].replace("\n","\\n"))
        print("suspect_y  :", rec["suspect_y"][:160].replace("\n","\\n"))

    out_obj = {
        "base_model_name": model_name,
        "suspect_model_name": suspect_base_name,
        "prompt_style": style,
        "num_pairs": len(pairs),
        "records": minimal_records,
    }

    # 4) 保存：仅极简列表
    out_path = None
    if save_report_path:
        out_path = ts_path(save_report_path, model_name=model_name)
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(out_obj, f, ensure_ascii=False, indent=2)
        print(f"[save] minimal summary -> {out_path}")

    return minimal_records, out_path

In [None]:
import torch

class SuspectFromLoadedHF:
    def __init__(self, model, tok):
        self.model = model.eval()
        self.tok = tok
        self.device = next(model.parameters()).device

    @torch.no_grad()
    def generate_answer(self, full_prompt, max_new_tokens=128, stop_tokens=None):
        inputs = self.tok(full_prompt, return_tensors="pt").to(self.device)
        inp_len = inputs["input_ids"].shape[1]
        out_ids = self.model.generate(
            **inputs,
            do_sample=False,  # greedy (= temp 0)
            max_new_tokens=max_new_tokens,
            pad_token_id=self.tok.eos_token_id,
            eos_token_id=self.tok.eos_token_id,
        )[0]
        text = self.tok.decode(out_ids[inp_len:], skip_special_tokens=True, clean_up_tokenization_spaces=False)
        if stop_tokens:
            for st in stop_tokens:
                i = text.find(st)
                if i != -1:
                    text = text[:i]
        return text

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
# def load_hf_model(model_name: str, suspect_model: str, device: str | None = None, dtype: torch.dtype | None = None):
#     """
#     Universal Hugging Face model loader.
#     Returns (model, tokenizer, device).
#     - Handles pad_token and left-padding consistently.
#     - Defaults to float16 (for T4 / Colab / GPU efficiency).
#     """
#     if device is None:
#         device = "cuda" if torch.cuda.is_available() else "cpu"
#     if dtype is None:
#         dtype = torch.float16

#     # --- Tokenizer Loading ---
#     tok = None
#     try:
#         tok = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=False)
#     except Exception as e:
#         print(f"[warn] tokenizer for {model_name} failed: {e}")
#         try:
#             print(f"[fallback] trying {suspect_model} instead...")
#             tok = AutoTokenizer.from_pretrained(suspect_model, trust_remote_code=True, use_fast=False)
#         except Exception as e2:
#             raise RuntimeError(f"Failed to load both tokenizers ({model_name}, {suspect_model}): {e2}")

#     if not hasattr(tok, "pad_token"):   # <-- safety guard
#         raise TypeError(f"Tokenizer load failed. Got type: {type(tok)}")

#     # Ensure pad_token exists
#     if tok.pad_token is None:
#         tok.pad_token = tok.eos_token or tok.unk_token
#     tok.padding_side = "left"

#     # --- Model Loading ---
#     model = AutoModelForCausalLM.from_pretrained(
#         model_name,
#         torch_dtype=dtype,
#         low_cpu_mem_usage=True,
#         trust_remote_code=True,
#         device_map={"": device},
#     ).eval()

#     return model, tok, device

def load_hf_model(model_id, fourbit=False, torch_dtype=torch.float16, device_map="auto"):
    tok = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
    if tok.pad_token is None:
        tok.pad_token = tok.eos_token

    kwargs = dict(trust_remote_code=True, device_map=device_map)
    if fourbit:
        bnb = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_use_double_quant=True,
            bnb_4bit_compute_dtype=torch.float16,
        )
        kwargs["quantization_config"] = bnb
    else:
        kwargs["torch_dtype"] = torch_dtype

    model = AutoModelForCausalLM.from_pretrained(model_id, **kwargs)
    model.eval()
    return model, tok, next(model.parameters()).device

In [None]:
# === memory_cleanup.py（或直接放到一个cell里）===
import gc, torch

def unload_hf_model(model=None, tokenizer=None):
    """释放 HF 模型与显存/内存。兼容 device_map('auto') / 单卡。"""
    try:
        if model is not None:
            try:
                model.to('cpu')  # 先转回 CPU，避免有残留显存句柄
            except Exception:
                pass
            del model
    except Exception:
        pass
    try:
        if tokenizer is not None:
            del tokenizer
    except Exception:
        pass

    # Python 对象回收
    gc.collect()

    # CUDA 显存清理（含跨进程共享块）
    if torch.cuda.is_available():
        try:
            torch.cuda.empty_cache()
            torch.cuda.ipc_collect()
        except Exception:
            pass

def unload_llama_cpp(llm=None):
    """释放 llama.cpp GGUF 实例（如用到的话）。"""
    try:
        if llm is not None and hasattr(llm, "close"):
            llm.close()
    except Exception:
        pass
    try:
        del llm
    except Exception:
        pass
    gc.collect()
    if torch.cuda.is_available():
        try:
            torch.cuda.empty_cache()
            torch.cuda.ipc_collect()
        except Exception:
            pass

## check

In [None]:
model_name = "cognitivecomputations/dolphin-2.9.3-mistral-7B-32k"
model_orgin = "dolphin-2.9.3-mistral-7B-32k"
model_base_name = "mistralai/Mistral-7B-v0.3"

In [None]:

# -------- model1: 基础模型（用来生成指纹） --------
model1, tok1, dev1 = load_hf_model(model_name, fourbit=False)

# -------- model2: 被测模型（在指纹上验证） --------
model2, tok2, dev2 = load_hf_model(model_base_name, fourbit=False)
print("Loaded:\n - model1 on", dev1, "\n - model2 on", dev2)

# 可选：快速贪心自测（不含 system，保持确定性）
@torch.no_grad()
def greedy_once(model, tok, prompt, max_new_tokens=50):
    device = next(model.parameters()).device
    inp = tok(prompt, return_tensors="pt").to(device)
    out = model.generate(
        **inp,
        do_sample=False,               # = temperature 0
        max_new_tokens=max_new_tokens,
        pad_token_id=tok.pad_token_id,
        eos_token_id=tok.eos_token_id,
    )[0]
    print(tok.decode(out[inp["input_ids"].shape[1]:], skip_special_tokens=True))

# 示例（可删）：不使用 system，仅 oneshot 结构
test_prompt = "user: Hi, introduce your self \nassistant:"
greedy_once(model1, tok1, test_prompt, 40)
greedy_once(model2, tok2, test_prompt, 40)

In [None]:
# from fingerprint_tools import set_seed, generate_fingerprints_batch

set_seed(42)

pairs, pairs_path = generate_fingerprints_batch(
    model=model1,
    model_name=model_orgin,
    tokenizer=tok1,
    num_pairs=3,                 #  3 个
    prompt_style="raw",      # 只用 "user: ...\nassistant:" 这种最小格式
    # system_prompt="",            # 不要 system（传空字符串）
    l_random_prefix=8,
    total_len=64,
    k_bottom=50,
    max_new_tokens=64,
    save_json_path="fingerprints_init.json",  # 会自动带时间戳落盘
)

print("Saved to:", pairs_path)
print("x'[0]:", pairs[0]["x_prime"][:120])
print("y [0]:", pairs[0]["y_response"][:120])

In [None]:
# from fingerprint_tools import evaluate_fingerprints

suspect = SuspectFromLoadedHF(model2, tok2)

report = evaluate_fingerprints(
    pairs_json_path=pairs_path,
    model_name = model_orgin,
    suspect_base_name=model_base_name,
    suspect_model=suspect,
    suspect_label="model2-on-model1-fp",
    save_report_path="eval_report.json",
)