In [1]:
import torch
import sys
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification

repo_path = "/mnt/e/untitled folder/codebase/LoRO/LoRO"  
if os.path.exists(repo_path) and repo_path not in sys.path:
    sys.path.append(repo_path)

try:
    from utils import model_obfuscation
except ImportError as e:
    print('wrong repo_path')
    sys.exit(1)

# ==========================================
# 1. 加载目标模型 (Private Model)
# ==========================================
model_id = "facebook/bart-large-mnli"
device = "cpu"
save_path = "/mnt/e/untitled folder/codebase/LoRO_attack/loro_bart_obfuscated.pt"

print(f"正在加载模型: {model_id} ...")
# bart-large-mnli 是一个分类模型 (SequenceClassification)
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForSequenceClassification.from_pretrained(model_id).to(device)

print("模型加载完成。准备进行 LoRO 混淆...")

# ==========================================
# 2. 执行混淆 (调用仓库代码)
# ==========================================
noise_magnitude = 1

print(f"开始混淆 (Noise Magnitude: {noise_magnitude})...")
obfuscated_model = model_obfuscation(model, device=device, noise_mag=noise_magnitude, r=16)

# ==========================================
# 4. 保存混淆后的 Checkpoint
# ==========================================
print(f"正在保存混淆后的模型至: {save_path} ...")
torch.save(obfuscated_model.state_dict(), save_path)

print(f"Checkpoint Path: {os.path.abspath(save_path)}")


正在加载模型: facebook/bart-large-mnli ...
模型加载完成。准备进行 LoRO 混淆...
开始混淆 (Noise Magnitude: 1)...
Obfuscating: model.encoder.layers.0.self_attn.k_proj
Obfuscating: model.encoder.layers.0.self_attn.v_proj
Obfuscating: model.encoder.layers.0.self_attn.q_proj
Obfuscating: model.encoder.layers.0.self_attn.out_proj
Obfuscating: model.encoder.layers.0.fc1
Obfuscating: model.encoder.layers.0.fc2
Obfuscating: model.encoder.layers.1.self_attn.k_proj
Obfuscating: model.encoder.layers.1.self_attn.v_proj
Obfuscating: model.encoder.layers.1.self_attn.q_proj
Obfuscating: model.encoder.layers.1.self_attn.out_proj
Obfuscating: model.encoder.layers.1.fc1
Obfuscating: model.encoder.layers.1.fc2
Obfuscating: model.encoder.layers.2.self_attn.k_proj
Obfuscating: model.encoder.layers.2.self_attn.v_proj
Obfuscating: model.encoder.layers.2.self_attn.q_proj
Obfuscating: model.encoder.layers.2.self_attn.out_proj
Obfuscating: model.encoder.layers.2.fc1
Obfuscating: model.encoder.layers.2.fc2
Obfuscating: model.encoder.la

In [7]:
import torch
from transformers import AutoModelForSequenceClassification
import numpy as np
import pandas as pd

# ==========================================
# 配置
# ==========================================
model_id_ft = "facebook/bart-large-mnli"   # Target (Private/Fine-tuned)
model_id_base = "facebook/bart-large"      # Prior (Public/Base)
device = "cpu"

print(f"Loading Fine-Tuned Model: {model_id_ft}...")
model_ft = AutoModelForSequenceClassification.from_pretrained(model_id_ft).to(device)

print(f"Loading Base Model: {model_id_base}...")
# 注意：bart-large 是基础模型，结构需要与 mnli 版本一致
# AutoModelForSequenceClassification 会自动初始化分类头，
# 但 backbone (encoder/decoder) 的参数应该能对应上。
model_base = AutoModelForSequenceClassification.from_pretrained(model_id_base).to(device)

print("\nStarting Comparison (FT vs. Base)...")
print("-" * 80)
print(f"{'Layer Name':<50} | {'Cos Sim':<10} | {'Delta Norm':<12} | {'Rel Diff (%)':<12}")
print("-" * 80)

results = []

# 获取所有模块的字典
modules_ft = dict(model_ft.named_modules())
modules_base = dict(model_base.named_modules())

# 遍历 FT 模型的层
for name, module_ft in model_ft.named_modules():
    if isinstance(module_ft, torch.nn.Linear):
        # 确保 Base 模型中有同名层
        if name in modules_base:
            module_base = modules_base[name]
            
            # 获取权重 (Clone detached to avoid grad issues)
            w_ft = module_ft.weight.detach()
            w_base = module_base.weight.detach()
            
            # 检查形状是否一致 (分类头可能不一致)
            if w_ft.shape != w_base.shape:
                print(f"[Skipping] {name}: Shapes mismatch {w_ft.shape} vs {w_base.shape} (Likely Classification Head)")
                continue
                
            # 1. 计算 Cosine Similarity
            # Flatten 之后计算向量夹角
            cos_sim = torch.nn.functional.cosine_similarity(
                w_ft.flatten(), 
                w_base.flatten(), 
                dim=0
            ).item()
            
            # 2. 计算 Delta (FT - Base)
            delta = w_ft - w_base
            norm_delta = torch.norm(delta).item()
            
            # 3. 计算 Base Norm
            norm_base = torch.norm(w_base).item()
            
            # 4. 计算相对差异 (Relative Difference)
            # diff / norm_base
            rel_diff = norm_delta / norm_base if norm_base > 0 else 0.0
            
            # 打印部分层的结果 (为了展示整洁，可以每隔几层打印一次，或者打印所有)
            # 这里打印所有 Encoder/Decoder 的投影层
            if "proj" in name or "fc" in name:
                print(f"{name:<50} | {cos_sim:.6f}   | {norm_delta:.4f}       | {rel_diff*100:.4f}%")
            
            results.append({
                "Layer": name,
                "Cos_Sim": cos_sim,
                "Delta_Norm": norm_delta,
                "Base_Norm": norm_base,
                "Rel_Diff": rel_diff
            })
        else:
            print(f"[Missing] {name} not found in Base Model.")

# ==========================================
# 统计摘要
# ==========================================
df = pd.DataFrame(results)
print("-" * 80)
print("Summary Statistics:")
print(f"Average Cosine Similarity: {df['Cos_Sim'].mean():.6f}")
print(f"Average Relative Diff:     {df['Rel_Diff'].mean()*100:.4f}%")
print(f"Min Cosine Similarity:     {df['Cos_Sim'].min():.6f}")
print("-" * 80)


Loading Fine-Tuned Model: facebook/bart-large-mnli...
Loading Base Model: facebook/bart-large...


Some weights of BartForSequenceClassification were not initialized from the model checkpoint at facebook/bart-large and are newly initialized: ['classification_head.dense.bias', 'classification_head.dense.weight', 'classification_head.out_proj.bias', 'classification_head.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Starting Comparison (FT vs. Base)...
--------------------------------------------------------------------------------
Layer Name                                         | Cos Sim    | Delta Norm   | Rel Diff (%)
--------------------------------------------------------------------------------
model.encoder.layers.0.self_attn.k_proj            | 0.999108   | 3.4783       | 4.2349%
model.encoder.layers.0.self_attn.v_proj            | 0.996787   | 2.9122       | 8.0912%
model.encoder.layers.0.self_attn.q_proj            | 0.999066   | 3.5483       | 4.3220%
model.encoder.layers.0.self_attn.out_proj          | 0.996372   | 3.1320       | 8.5911%
model.encoder.layers.0.fc1                         | 0.998407   | 6.7873       | 5.9929%
model.encoder.layers.0.fc2                         | 0.998149   | 6.6759       | 6.4737%
model.encoder.layers.1.self_attn.k_proj            | 0.999255   | 3.2776       | 3.8583%
model.encoder.layers.1.self_attn.v_proj            | 0.997652   | 2.8943       | 6.

In [13]:
import torch
import torch.nn as nn
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import os
import copy
from tqdm import tqdm

# ==========================================
# 1. 配置
# ==========================================
# 攻击者的目标：从 Obfuscated Checkpoint + Base Model 恢复出 Private Model
base_model_id = "facebook/bart-large"       # 攻击者拥有的先验
target_model_id = "facebook/bart-large-mnli"  # 仅用于验证攻击成功率 (GT)
obfuscated_checkpoint = "/mnt/e/untitled folder/codebase/LoRO_attack/loro_bart_obfuscated.pt" # 您的混淆文件路径
save_path_recovered = "/mnt/e/untitled folder/codebase/LoRO_attack/recovered_bart_model"    # 还原后的模型保存路径
device = "cuda" if torch.cuda.is_available() else "cpu"

# SVD 攻击参数
# 根据您的实验结果，Rank=16 效果很好。LoRO 默认 Rank=8，通常剔除稍多一点(比如 16 或 32)能更干净地去噪。
REMOVE_RANK = 16 

# ==========================================
# 2. 模型加载
# ==========================================
print(f"1. Loading Base Model (Prior): {base_model_id}...")
# 攻击者初始只有 Base 模型
recovered_model = AutoModelForSequenceClassification.from_pretrained(base_model_id).to(device)

print(f"2. Loading Obfuscated Checkpoint: {obfuscated_checkpoint}...")
if not os.path.exists(obfuscated_checkpoint):
    raise FileNotFoundError("混淆 Checkpoint 未找到，请检查路径。")
obfus_state_dict = torch.load(obfuscated_checkpoint, map_location=device)

print(f"3. Loading Ground Truth (for validation): {target_model_id}...")
gt_model = AutoModelForSequenceClassification.from_pretrained(target_model_id).to(device)

# ==========================================
# 3. 执行全模型攻击
# ==========================================
print("\n" + "="*50)
print(f"STARTING FULL MODEL RECOVERY (Removing Top-{REMOVE_RANK} Singular Components)")
print("="*50)

# 用于统计恢复效果
similarities = []
relative_errors = []

# 获取所有线性层
# 我们遍历 recovered_model (即 base_model) 的模块，然后去 check state_dict 里有没有对应的混淆权重
all_modules = list(recovered_model.named_modules())
linear_layers = [(n, m) for n, m in all_modules if isinstance(m, nn.Linear)]

progress_bar = tqdm(linear_layers, desc="Recovering Layers")

for name, module in progress_bar:
    # 1. 获取 Base 权重 (Prior)
    W_base = module.weight.detach()
    
    # 2. 获取 Obfuscated 权重 (Observation)
    # LoRO 的 state_dict key 通常是 "layer_name.obfus_linear.weight"
    obfus_key = f"{name}.obfus_linear.weight"
    
    # 如果找不到对应的 key，说明这一层可能没有被混淆（或者是分类头等特殊层）
    # 但根据 LoRO 逻辑，Linear 层应该都被混淆了
    if obfus_key not in obfus_state_dict:
        # 尝试直接找 name.weight (有些层可能未被 LoRO 包装)
        if f"{name}.weight" in obfus_state_dict:
            # 如果没混淆，直接加载（或者攻击者认为这就是原样）
            # 但这里我们假设攻击者不知道，只看混淆文件
            continue
        else:
            # 可能是分类头，LoRO 有时也会混淆它。
            # 如果 key 不匹配，跳过
            continue
            
    W_obfus = obfus_state_dict[obfus_key].detach()
    
    # 3. 计算 Diff
    # Diff = W_obfus - W_base
    Diff = W_obfus - W_base
    
    # 4. SVD 攻击 (去噪)
    # 使用 float32 进行 SVD 以保证精度
    U, S, Vh = torch.linalg.svd(Diff.float(), full_matrices=False)
    
    # 剔除前 K 个奇异值 (认为它们是 LoRO 注入的低秩噪声)
    S_clean = S.clone()
    S_clean[:REMOVE_RANK] = 0.0
    
    # 重构 Delta
    Delta_Recovered = (U @ torch.diag(S_clean) @ Vh).to(W_base.dtype)
    
    # 5. 恢复权重
    # W_rec = W_base + Delta_rec
    W_recovered = W_base + Delta_Recovered
    
    # 更新 recovered_model 的权重
    module.weight.data = W_recovered
    
    # 处理 Bias (LoRO 代码中 Bias 是直接存储的，通常没有加噪声，或者加了也可以直接减)
    # 检查 utils.py/loro.py: "self.obfus_linear.bias = torch.nn.Parameter(original_linear.bias)"
    # Bias 没有加噪声！所以直接从 obfus_state_dict 读取即可（攻击者可以直接拿）
    obfus_bias_key = f"{name}.obfus_linear.bias"
    if obfus_bias_key in obfus_state_dict and module.bias is not None:
        module.bias.data = obfus_state_dict[obfus_bias_key].detach()
    
    # 6. 验证 (与 GT 对比)
    # 获取 GT 对应层的权重
    # 注意：需通过 name 从 gt_model 索引
    gt_module = dict(gt_model.named_modules())[name]
    W_gt = gt_module.weight.detach()
    
    # 计算指标
    # 真实的 Delta = W_gt - W_base
    Delta_True = W_gt - W_base
    
    # 计算恢复出的 Delta 和 真实 Delta 的相似度
    sim = torch.nn.functional.cosine_similarity(Delta_True.flatten(), Delta_Recovered.flatten(), dim=0).item()
    rel_err = torch.norm(Delta_Recovered - Delta_True) / torch.norm(Delta_True)
    
    similarities.append(sim)
    relative_errors.append(rel_err.item())
    
    # 更新进度条显示当前层的相似度
    progress_bar.set_postfix({"Sim": f"{sim:.4f}", "Err": f"{rel_err:.4f}"})

# ==========================================
# 4. 结果汇总与保存
# ==========================================
print("\n" + "="*50)
print("RECOVERY COMPLETE")
print("="*50)

avg_sim = sum(similarities) / len(similarities)
avg_err = sum(relative_errors) / len(relative_errors)
min_sim = min(similarities)

print(f"Total Layers Recovered: {len(similarities)}")
print(f"Average Cosine Similarity: {avg_sim:.4f}")
print(f"Average Relative Error:    {avg_err:.4f}")
print(f"Worst Layer Similarity:    {min_sim:.4f}")

if avg_sim > 0.95:
    print("\n[SUCCESS] 模型还原极其成功！基本等同于原始私有模型。")
else:
    print("\n[WARNING] 模型还原效果一般，可能需要调整 REMOVE_RANK 参数。")

# 保存模型
print(f"\nSaving recovered model to {save_path_recovered}...")
recovered_model.save_pretrained(save_path_recovered)
tokenizer = AutoTokenizer.from_pretrained(base_model_id)
tokenizer.save_pretrained(save_path_recovered)
print("Saved. You can now load this model with 'AutoModelForSequenceClassification.from_pretrained'.")

1. Loading Base Model (Prior): facebook/bart-large...


Some weights of BartForSequenceClassification were not initialized from the model checkpoint at facebook/bart-large and are newly initialized: ['classification_head.dense.bias', 'classification_head.dense.weight', 'classification_head.out_proj.bias', 'classification_head.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


2. Loading Obfuscated Checkpoint: /mnt/e/untitled folder/codebase/LoRO_attack/loro_bart_obfuscated.pt...
3. Loading Ground Truth (for validation): facebook/bart-large-mnli...

STARTING FULL MODEL RECOVERY (Removing Top-16 Singular Components)


Recovering Layers: 100%|████████████████████████████| 194/194 [01:12<00:00,  2.68it/s, Sim=0.0000, Err=1.0000]



RECOVERY COMPLETE
Total Layers Recovered: 194
Average Cosine Similarity: 0.9796
Average Relative Error:    0.1778
Worst Layer Similarity:    0.0000

[SUCCESS] 模型还原极其成功！基本等同于原始私有模型。

Saving recovered model to /mnt/e/untitled folder/codebase/LoRO_attack/recovered_bart_model...
Saved. You can now load this model with 'AutoModelForSequenceClassification.from_pretrained'.
