In [33]:
# %% [Cell 1] 环境安装与模型加载
# ==============================================================================
# 目的: 安装依赖，加载 Qwen2.5-1.5B 和 Embedding 模型。
# ==============================================================================

# !pip install transformers accelerate bitsandbytes sentence-transformers wandb termcolor tqdm -q

import torch
import numpy as np
import random
import time
import difflib
import wandb
from statistics import mean
from collections import deque
from termcolor import colored
from transformers import AutoModelForCausalLM, AutoTokenizer
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

print(colored("正在加载主模型 (Qwen2.5-1.5B)...", "cyan"))
MODEL_ID = "Qwen/Qwen2.5-1.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype="auto", device_map="auto")

print(colored("正在加载向量模型 (all-MiniLM-L6-v2)...", "cyan"))
embedder = SentenceTransformer('all-MiniLM-L6-v2')

def generate(prompt, max_tokens=200):
    messages = [{"role": "user", "content": prompt}]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer([text], return_tensors="pt").to(model.device)
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=max_tokens, temperature=0.01)
    response = tokenizer.batch_decode(outputs[:, inputs.input_ids.shape[1]:], skip_special_tokens=True)[0]
    return response.strip()

print(colored(">>> 环境加载完成。", "green"))

正在加载主模型 (Qwen2.5-1.5B)...
正在加载向量模型 (all-MiniLM-L6-v2)...
>>> 环境加载完成。


In [34]:
# %% [Cell 2] 定义五种记忆机制 (含真实 LLM 调用)
# ==============================================================================
# 目的: 定义 Baseline, RAG, Cheatsheet, Titans, TAC。
# 修改: 全员增加 add_special 接口，确保公平性。
# ==============================================================================

class BaseMemory:
    def add(self, query, response): pass
    def add_special(self, query, thought, response): 
        # 默认回退到普通 add，子类可覆盖
        self.add(query, response)
    def get_context(self, query): return ""
    def reset(self): pass

# 1. Baseline
class ConcatMemory(BaseMemory):
    def __init__(self, limit=3):
        self.limit = limit
        self.history = deque(maxlen=limit)
    def add(self, query, response):
        self.history.append(f"User: {query}\nAI: {response}")
    def add_special(self, query, thought, response):
        # Baseline 忽略 thought，行为与 add 相同
        self.add(query, response)
    def get_context(self, query):
        return "\n".join(self.history)
    def reset(self): self.history.clear()

# 2. RAG
class RAGMemory(BaseMemory):
    def __init__(self, top_k=2):
        self.top_k = top_k
        self.corpus = [] 
        self.embeddings = None
    def add(self, query, response):
        text = f"User: {query}\nAI: {response}"
        self._embed_and_store(text)
    def add_special(self, query, thought, response):
        # RAG 将 thought 也加入索引，增加语义丰富度
        text = f"User: {query}\nThought: {thought}\nAI: {response}"
        self._embed_and_store(text)
    def _embed_and_store(self, text):
        vec = embedder.encode([text])
        self.corpus.append(text)
        if self.embeddings is None: self.embeddings = vec
        else: self.embeddings = np.vstack([self.embeddings, vec])
    def get_context(self, query):
        if self.embeddings is None: return ""
        query_vec = embedder.encode([query])
        scores = cosine_similarity(query_vec, self.embeddings)[0]
        top_indices = np.argsort(scores)[-self.top_k:]
        top_indices = sorted(top_indices)
        return "[RAG Context]:\n" + "\n".join([self.corpus[i] for i in top_indices])
    def reset(self):
        self.corpus = []
        self.embeddings = None

# 3. Cheatsheet (High Latency)
class DynamicCheatsheetMemory(BaseMemory):
    def __init__(self):
        self.cheatsheet = "No facts yet."
    def add(self, query, response):
        self._update_knowledge(query, "", response)
    def add_special(self, query, thought, response):
        # 给 Curator 提供 Thought 上下文
        self._update_knowledge(query, thought, response)
    def _update_knowledge(self, query, thought, response):
        thought_part = f"\nThought: {thought}" if thought else ""
        prompt = (f"Current Knowledge:\n{self.cheatsheet}\nNew Interaction:\nUser: {query}{thought_part}\nAI: {response}\n"
                  "Task: Update Knowledge with new facts. Keep it concise.\nUpdated Knowledge:")
        self.cheatsheet = generate(prompt, max_tokens=100)
    def get_context(self, query):
        return f"[Cheatsheet]:\n{self.cheatsheet}"
    def reset(self): self.cheatsheet = "No facts yet."

# 4. Titans (High Latency)
class TitansMemory(BaseMemory):
    def __init__(self):
        self.state = "Empty state."
    def add(self, query, response):
        self._compress_state(query, "", response)
    def add_special(self, query, thought, response):
        # 给 Neural Memory 提供 Thought 上下文
        self._compress_state(query, thought, response)
    def _compress_state(self, query, thought, response):
        thought_part = f"\nThought: {thought}" if thought else ""
        prompt = (f"Memory State:\n{self.state}\nInput:\nUser: {query}{thought_part}\nAI: {response}\n"
                  "Task: Compress Input into Memory State. Retain critical info.\nNew State:")
        self.state = generate(prompt, max_tokens=100)
    def get_context(self, query):
        return f"[Titans State]:\n{self.state}"
    def reset(self): self.state = "Empty state."

# 5. TAC (Ours - Low Latency)
class TACMemory(BaseMemory):
    def __init__(self, limit=3):
        self.limit = limit
        self.anchor = "N/A"
        self.window = deque(maxlen=limit)
    def add_special(self, query, thought, response):
        if len(thought) > 10: self.anchor = thought
        self.window.append(f"User: {query}\nAI: {response}")
    def add(self, query, response):
        self.window.append(f"User: {query}\nAI: {response}")
    def get_context(self, query):
        hist = "\n".join(self.window)
        return f"[Anchor Thought]: {self.anchor}\n[Window]:\n{hist}"
    def reset(self):
        self.anchor = "N/A"
        self.window.clear()

print(colored(">>> 记忆机制定义完毕。", "green"))

>>> 记忆机制定义完毕。


In [35]:
# %% [Cell 3] 真实干扰数据集与任务逻辑
# ==============================================================================
# 目的: 准备真实问答数据，定义带评分的任务函数。
# 修改: 移除 isinstance 判断，统一使用 add_special 接口。
# ==============================================================================

REAL_DISTRACTORS = [
    ("What is the capital of Australia?", "Canberra."),
    ("Who wrote '1984'?", "George Orwell."),
    ("Define photosynthesis.", "Plants use sunlight to make food."),
    ("Speed of light?", "299,792,458 m/s."),
    ("Pythagorean theorem?", "a^2 + b^2 = c^2."),
    ("Boiling point of water?", "100 C."),
    ("Who painted Mona Lisa?", "Da Vinci."),
    ("What is a black hole?", "Strong gravity region."),
    ("What is GPU?", "Graphics Processing Unit."),
    ("Who is Elon Musk?", "Tesla CEO."),
    ("What is Python?", "Programming language."),
    ("Quantum entanglement?", "Connected particles."),
    ("Tallest mountain?", "Everest."),
    ("Discovered penicillin?", "Fleming."),
    ("Currency of Japan?", "Yen."),
    ("HTTP stands for?", "Hypertext Transfer Protocol."),
    ("First on moon?", "Armstrong."),
    ("What is DNA?", "Genetic instructions."),
    ("Largest ocean?", "Pacific."),
    ("Romeo and Juliet author?", "Shakespeare.")
]

def get_distractors(count):
    pool = REAL_DISTRACTORS * (count // len(REAL_DISTRACTORS) + 1)
    return random.sample(pool, count)

def calculate_soft_score(ground_truth, prediction):
    t = ground_truth.lower().strip()
    p = prediction.lower().strip()
    if t in p: return 1.0
    similarity = difflib.SequenceMatcher(None, t, p).ratio()
    return 0.0 if similarity < 0.25 else similarity

# Task A: NIAH - 公平版 (接收固定的 code 和 distractors)
def task_niah(mem, fixed_code, fixed_distractors):
    mem.reset()
    q_n, a_n = "Set protocol code.", f"Code is {fixed_code}."
    
    # 统一生成 Thought，所有方法都有权看到
    thought = f"The activation code is {fixed_code}. I must keep this in mind."
    
    # 统一调用 add_special
    mem.add_special(q_n, thought, a_n)
    
    # 注入干扰项 (无 Thought)
    for q, a in fixed_distractors:
        mem.add(q, a)
        
    resp = generate(f"Context:\n{mem.get_context('Code?')}\n\nUser: Code?\nAnswer:")
    return calculate_soft_score(fixed_code, resp)

# Task B: Multi-hop - 公平版 (接收固定的干扰项列表)
def task_multihop(mem, distractors_hop1, distractors_hop2):
    mem.reset()
    city, country = "Kyoto", "Japan"
    
    # Hop 1
    q1, a1 = "Where is X?", f"X is in {city}."
    thought1 = f"X is located in {city}."
    mem.add_special(q1, thought1, a1)
    
    for q, a in distractors_hop1:
        mem.add(q, a)
        
    # Hop 2
    q2, a2 = f"Where is {city}?", f"{city} is in {country}."
    thought2 = f"{city} is located in {country}."
    mem.add_special(q2, thought2, a2)
    
    for q, a in distractors_hop2:
        mem.add(q, a)
        
    resp = generate(f"Context:\n{mem.get_context('Country?')}\n\nUser: Country?\nAnswer:")
    return 1.0 if country in resp else (0.5 if city in resp else 0.0)

print(colored(">>> 任务逻辑优化完毕 (公平接口 + 固定测试集)。", "green"))



>>> 任务逻辑优化完毕 (公平接口 + 固定测试集)。


In [36]:
# %% [Cell 4] 执行全维度对比实验 (The Main Engine)
# ==============================================================================
# 目的: 运行实验，记录 Score (效能) 和 Latency (效率)。
# 优化: 在每种强度下预生成固定数据集，确保所有方法“做同一套卷子”。
# ==============================================================================

def run_benchmark_optimized():
    # 初始化 WandB
    wandb.init(project="Memory-Comparison-Final", name="Fair-Comparison-Fixed-Seed")
    
    NUM_REPEATS = 3  # 每个任务重复3次
    
    methods = {
        "1. Baseline": ConcatMemory(limit=5),
        "2. RAG": RAGMemory(top_k=3),
        "3. Cheatsheet": DynamicCheatsheetMemory(),
        "4. Titans": TitansMemory(),
        "5. TAC (Ours)": TACMemory(limit=3)
    }
    
    # 强度梯度
    intensity_levels = [5, 10, 20, 30]
    
    all_results_table = wandb.Table(columns=["Method", "Intensity", "Score", "Latency"])
    
    print(colored("=== 开始双维度评估 (效能 vs 效率 | 公平对比模式) ===", "yellow"))
    
    for intensity in intensity_levels:
        print(colored(f"\n>>> Current Intensity: {intensity}", "white", attrs=["bold"]))
        
        # --- 关键优化：预生成该强度的标准考卷 (Standard Test Suite) ---
        # 1. 为 NIAH 生成固定密码和固定干扰项
        fixed_niah_code = f"Alpha-{random.randint(100,999)}"
        fixed_niah_distractors = get_distractors(intensity)
        
        # 2. 为 Multi-hop 生成两组固定干扰项
        fixed_multihop_d1 = get_distractors(intensity // 2)
        fixed_multihop_d2 = get_distractors(intensity // 2)
        
        print(f"    [Setup] Generated fixed test set for intensity {intensity}.")

        for name, mem in methods.items():
            print(f"   Testing {name}...", end=" ", flush=True)
    
            start_t = time.time()
            scores = []
            
            for rep in range(NUM_REPEATS):
                set_seed(42 + intensity + rep)  # 不同重复用不同种子
                s1 = task_niah(mem, fixed_niah_code, fixed_niah_distractors)
                s2 = task_multihop(mem, fixed_multihop_d1, fixed_multihop_d2)
                scores.extend([s1, s2])
            
            avg_score = mean(scores)
            duration = time.time() - start_t
            
            print(f"-> Score: {avg_score:.2f} | Time: {duration:.1f}s")
            
            wandb.log({
                "intensity": intensity,          
                f"score_{name}": avg_score,      
                f"latency_{name}": duration,     
                "method_name": name              
            })
            
            all_results_table.add_data(name, intensity, avg_score, duration)
            
    wandb.log({"All_Results": all_results_table})
    wandb.finish()
    print(colored("\n>>> 实验结束！请前往 WandB 查看两张核心图表。", "green"))

In [37]:
# %% [Cell 5] 启动
if __name__ == "__main__":
    run_benchmark_optimized()

=== 开始双维度评估 (效能 vs 效率 | 公平对比模式) ===

>>> Current Intensity: 5
    [Setup] Generated fixed test set for intensity 5.
   Testing 1. Baseline... -> Score: 0.00 | Time: 25.5s
   Testing 2. RAG... -> Score: 0.50 | Time: 7.7s
   Testing 3. Cheatsheet... -> Score: 0.00 | Time: 68.4s
   Testing 4. Titans... -> Score: 0.00 | Time: 37.8s
   Testing 5. TAC (Ours)... -> Score: 1.00 | Time: 2.8s

>>> Current Intensity: 10
    [Setup] Generated fixed test set for intensity 10.
   Testing 1. Baseline... -> Score: 0.00 | Time: 13.7s
   Testing 2. RAG... -> Score: 0.33 | Time: 14.6s
   Testing 3. Cheatsheet... -> Score: 0.00 | Time: 112.0s
   Testing 4. Titans... -> Score: 0.00 | Time: 50.0s
   Testing 5. TAC (Ours)... -> Score: 1.00 | Time: 2.4s

>>> Current Intensity: 20
    [Setup] Generated fixed test set for intensity 20.
   Testing 1. Baseline... -> Score: 0.50 | Time: 24.7s
   Testing 2. RAG... -> Score: 0.50 | Time: 21.3s
   Testing 3. Cheatsheet... -> Score: 0.00 | Time: 193.8s
   Testing 4. T

0,1
intensity,▁▁▁▁▁▂▂▂▂▂▅▅▅▅▅█████
latency_1. Baseline,█▃█▁
latency_2. RAG,▁▅█▆
latency_3. Cheatsheet,▁▃▆█
latency_4. Titans,▁▂▅█
latency_5. TAC (Ours),█▁▄▃
score_1. Baseline,▁▁█▁
score_2. RAG,█▁██
score_3. Cheatsheet,▁▁▁▁
score_4. Titans,▁▁▁▁

0,1
intensity,30
latency_1. Baseline,9.33646
latency_2. RAG,16.64786
latency_3. Cheatsheet,244.97898
latency_4. Titans,129.85149
latency_5. TAC (Ours),2.5562
method_name,5. TAC (Ours)
score_1. Baseline,0
score_2. RAG,0.5
score_3. Cheatsheet,0



>>> 实验结束！请前往 WandB 查看两张核心图表。
