In [None]:
!pip install rouge_score


Collecting rouge_score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge_score
  Building wheel for rouge_score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge_score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=85055ba17e9a1b025252bf3b6c58aeccf56c10897e8476faa019a9f0ac0da274
  Stored in directory: /root/.cache/pip/wheels/1e/19/43/8a442dc83660ca25e163e1bd1f89919284ab0d0c1475475148
Successfully built rouge_score
Installing collected packages: rouge_score
Successfully installed rouge_score-0.1.2
正在加载模型与分词器...
模型加载完成。
初始化增强模块和消融版本...
模块初始化完成。
开始运行前向传播 benchmark...
前向传播耗时 (平均每次):
  基础模型: 0.398281 s
  增强模型: 0.417685 s
  消融 - 无层归一化: 0.435088 s
  消融 - 固定门控: 0.433241 s
生成文本耗时 (基础模型): 1.917742 s

生成文本示例:
Once upon a time, in a distant land, Once upon a time, in a distant land, Once upon a time, in a distant land, Once upon a time, in a distant land, Once upon a time, in a d

In [None]:
import time, math, torch, torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM
import numpy as np

# 尝试导入 BLEU 和 ROUGE 计算工具
try:
    from nltk.translate.bleu_score import sentence_bleu
except ImportError:
    print("请安装 nltk: pip install nltk")

try:
    from rouge_score import rouge_scorer
except ImportError:
    print("请安装 rouge_score: pip install rouge_score")

# ---------------------------
# 1. 加载模型与分词器
# ---------------------------
# 使用 RWKV/v6-Finch-1B6-HF，传入 trust_remote_code=True 以运行自定义代码
model_name = "RWKV/v6-Finch-1B6-HF"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
base_model = AutoModelForCausalLM.from_pretrained(model_name, trust_remote_code=True)
base_model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_model.to(device)

# ---------------------------
# 2. 定义增强模块及消融变体（支持 loss 计算，并添加 generate 方法）
# ---------------------------
class EnhancedAdaptiveTokenShiftRWKV(nn.Module):
    """
    完整增强版：针对每个隐藏通道采用独立门控参数，
    融合当前与前一时刻的隐藏状态，最后进行层归一化。
    当传入 labels 时，直接调用基础模型的 forward（支持 loss 计算）。
    """
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        hidden_size = base_model.config.hidden_size
        self.gate_weight = nn.Parameter(torch.ones(hidden_size))
        self.gate_bias = nn.Parameter(torch.zeros(hidden_size))

    def forward(self, input_ids, **kwargs):
        if "labels" in kwargs:
            return self.base_model(input_ids, **kwargs)
        kwargs.pop("output_hidden_states", None)
        outputs = self.base_model(
            input_ids,
            output_hidden_states=True,
            return_dict=True,
            **kwargs
        )
        hidden_states = outputs.hidden_states[-1]  # (batch, seq, hidden_size)
        # 计算门控系数，扩展为 (1, 1, hidden_size)
        gate = torch.sigmoid(self.gate_weight + self.gate_bias).unsqueeze(0).unsqueeze(0)
        # 构造前一时刻隐藏状态的偏移（从第二个 token 开始）
        shifted_hidden = torch.zeros_like(hidden_states)
        shifted_hidden[:, 1:, :] = hidden_states[:, :-1, :] * gate
        # 加权融合当前状态与偏移状态
        adaptive_hidden = (1 - gate) * hidden_states + gate * shifted_hidden
        # 层归一化
        adaptive_hidden = nn.functional.layer_norm(adaptive_hidden, normalized_shape=(hidden_states.size(-1),))
        return adaptive_hidden

    def generate(self, *args, **kwargs):
        # 直接调用底层模型的 generate 方法
        return self.base_model.generate(*args, **kwargs)

class AblationNoLayerNorm(nn.Module):
    """
    消融版本1：去掉层归一化。
    当传入 labels 时，直接调用基础模型的 forward。
    """
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        hidden_size = base_model.config.hidden_size
        self.gate_weight = nn.Parameter(torch.ones(hidden_size))
        self.gate_bias = nn.Parameter(torch.zeros(hidden_size))

    def forward(self, input_ids, **kwargs):
        if "labels" in kwargs:
            return self.base_model(input_ids, **kwargs)
        kwargs.pop("output_hidden_states", None)
        outputs = self.base_model(
            input_ids,
            output_hidden_states=True,
            return_dict=True,
            **kwargs
        )
        hidden_states = outputs.hidden_states[-1]
        gate = torch.sigmoid(self.gate_weight + self.gate_bias).unsqueeze(0).unsqueeze(0)
        shifted_hidden = torch.zeros_like(hidden_states)
        shifted_hidden[:, 1:, :] = hidden_states[:, :-1, :] * gate
        adaptive_hidden = (1 - gate) * hidden_states + gate * shifted_hidden
        return adaptive_hidden

    def generate(self, *args, **kwargs):
        return self.base_model.generate(*args, **kwargs)

class AblationFixedGate(nn.Module):
    """
    消融版本2：固定门控（例如设为常数1，不进行学习）。
    当传入 labels 时，直接调用基础模型的 forward。
    """
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.fixed_gate = 1.0  # 固定门控值

    def forward(self, input_ids, **kwargs):
        if "labels" in kwargs:
            return self.base_model(input_ids, **kwargs)
        kwargs.pop("output_hidden_states", None)
        outputs = self.base_model(
            input_ids,
            output_hidden_states=True,
            return_dict=True,
            **kwargs
        )
        hidden_states = outputs.hidden_states[-1]
        gate = torch.tensor(self.fixed_gate, device=hidden_states.device).unsqueeze(0).unsqueeze(0)
        shifted_hidden = torch.zeros_like(hidden_states)
        shifted_hidden[:, 1:, :] = hidden_states[:, :-1, :] * gate
        adaptive_hidden = (1 - gate) * hidden_states + gate * shifted_hidden
        adaptive_hidden = nn.functional.layer_norm(adaptive_hidden, normalized_shape=(hidden_states.size(-1),))
        return adaptive_hidden

    def generate(self, *args, **kwargs):
        return self.base_model.generate(*args, **kwargs)

# 初始化各个版本
enhanced_model = EnhancedAdaptiveTokenShiftRWKV(base_model).to(device).eval()
ablation_no_ln = AblationNoLayerNorm(base_model).to(device).eval()
ablation_fixed_gate = AblationFixedGate(base_model).to(device).eval()

# ---------------------------
# 3. 定义评估函数
# ---------------------------
def benchmark_forward(model, inputs, num_runs=20, warmup=5):
    for _ in range(warmup):
        _ = model(inputs["input_ids"], output_hidden_states=True)
        if torch.cuda.is_available():
            torch.cuda.synchronize()
    start = time.time()
    for _ in range(num_runs):
        _ = model(inputs["input_ids"], output_hidden_states=True)
        if torch.cuda.is_available():
            torch.cuda.synchronize()
    end = time.time()
    return (end - start) / num_runs

def benchmark_generate(model, inputs, max_length=50, num_runs=10, warmup=3):
    # 使用修正的生成参数，只传入 max_new_tokens，并传入 attention_mask
    for _ in range(warmup):
        _ = model.generate(
            inputs["input_ids"],
            attention_mask=inputs.get("attention_mask"),
            max_new_tokens=max_length,
            do_sample=True,
            temperature=0.8,
            top_p=0.95,
            repetition_penalty=1.1
        )
        if torch.cuda.is_available():
            torch.cuda.synchronize()
    start = time.time()
    for _ in range(num_runs):
        _ = model.generate(
            inputs["input_ids"],
            attention_mask=inputs.get("attention_mask"),
            max_new_tokens=max_length,
            do_sample=True,
            temperature=0.8,
            top_p=0.95,
            repetition_penalty=1.1
        )
        if torch.cuda.is_available():
            torch.cuda.synchronize()
    end = time.time()
    return (end - start) / num_runs

def compute_perplexity(model, text):
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs, labels=inputs["input_ids"])
    # 如果返回对象含有 loss，则直接使用
    if hasattr(outputs, "loss"):
        loss = outputs.loss.item()
    else:
        # 假设 outputs 为 logits，手动计算交叉熵
        logits = outputs
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = inputs["input_ids"][..., 1:].contiguous()
        loss = nn.functional.cross_entropy(shift_logits.view(-1, shift_logits.size(-1)),
                                           shift_labels.view(-1)).item()
    perplexity = math.exp(loss)
    return perplexity

def compute_bleu(reference, hypothesis):
    reference_tokens = reference.split()
    hypothesis_tokens = hypothesis.split()
    return sentence_bleu([reference_tokens], hypothesis_tokens)

def compute_rouge(reference, hypothesis):
    try:
        scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
        scores = scorer.score(reference, hypothesis)
        return scores
    except Exception as e:
        print("ROUGE计算错误:", e)
        return None

# ---------------------------
# 4. 运行评估实验
# ---------------------------
prompt = "Once upon a time, in a distant land, " * 5  # 中等长度提示
long_prompt = "In the realm of artificial intelligence, " * 50  # 较长文本提示

inputs = tokenizer(prompt, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}

# Benchmark 前向传播时间
base_fwd_time = benchmark_forward(base_model, inputs)
enhanced_fwd_time = benchmark_forward(enhanced_model, inputs)
ablation_no_ln_time = benchmark_forward(ablation_no_ln, inputs)
ablation_fixed_gate_time = benchmark_forward(ablation_fixed_gate, inputs)

# Benchmark 生成速度（使用基础模型）
gen_time = benchmark_generate(base_model, inputs, max_length=50)

print("前向传播耗时 (平均每次):")
print("  基础模型: {:.6f} s".format(base_fwd_time))
print("  增强模型: {:.6f} s".format(enhanced_fwd_time))
print("  消融 - 无层归一化: {:.6f} s".format(ablation_no_ln_time))
print("  消融 - 固定门控: {:.6f} s".format(ablation_fixed_gate_time))
print("生成文本耗时 (基础模型): {:.6f} s".format(gen_time))

# 生成文本示例（修复乱码问题）
generated_ids = base_model.generate(
    inputs["input_ids"],
    attention_mask=inputs.get("attention_mask"),
    max_new_tokens=50,
    do_sample=True,
    temperature=0.8,
    top_p=0.95,
    repetition_penalty=1.1
)
generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True, errors='replace')
print("\n生成文本示例:")
print(generated_text)

# 长文本生成测试
long_inputs = tokenizer(long_prompt, return_tensors="pt")
long_inputs = {k: v.to(device) for k, v in long_inputs.items()}
long_generated_ids = base_model.generate(
    long_inputs["input_ids"],
    attention_mask=long_inputs.get("attention_mask"),
    max_new_tokens=200,
    do_sample=True,
    temperature=0.8,
    top_p=0.95,
    repetition_penalty=1.1
)
long_generated_text = tokenizer.decode(long_generated_ids[0], skip_special_tokens=True, errors='replace')
print("\n长文本生成示例:")
print(long_generated_text)

# 生成质量评估：困惑度、BLEU、ROUGE
reference_text = "Once upon a time, in a distant land, there was a wise king who ruled with kindness and justice."
baseline_perplexity = compute_perplexity(base_model, reference_text)
enhanced_perplexity = compute_perplexity(enhanced_model, reference_text)
baseline_generated = tokenizer.decode(
    base_model.generate(inputs["input_ids"], attention_mask=inputs.get("attention_mask"), max_new_tokens=50,
                        do_sample=True, temperature=0.8, top_p=0.95, repetition_penalty=1.1)[0],
    skip_special_tokens=True, errors='replace'
)
enhanced_generated = tokenizer.decode(
    enhanced_model.generate(inputs["input_ids"], attention_mask=inputs.get("attention_mask"), max_new_tokens=50,
                              do_sample=True, temperature=0.8, top_p=0.95, repetition_penalty=1.1)[0],
    skip_special_tokens=True, errors='replace'
)
bleu_baseline = compute_bleu(reference_text, baseline_generated)
bleu_enhanced = compute_bleu(reference_text, enhanced_generated)
rouge_baseline = compute_rouge(reference_text, baseline_generated)
rouge_enhanced = compute_rouge(reference_text, enhanced_generated)

print("\n生成质量评估:")
print("  基础模型困惑度: {:.3f}".format(baseline_perplexity))
print("  增强模型困惑度: {:.3f}".format(enhanced_perplexity))
print("  基础模型 BLEU: {:.3f}".format(bleu_baseline))
print("  增强模型 BLEU: {:.3f}".format(bleu_enhanced))
print("  基础模型 ROUGE: {}".format(rouge_baseline))
print("  增强模型 ROUGE: {}".format(rouge_enhanced))


OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 14.74 GiB of which 20.12 MiB is free. Process 4582 has 14.72 GiB memory in use. Of the allocated memory 14.59 GiB is allocated by PyTorch, and 1.30 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)