In [1]:
import sys
sys.path.append("minGPT")   # 让 mingpt 这个包可 import

import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from minGPT.mingpt.model import GPT
from minGPT.mingpt.utils import set_seed
set_seed(3407)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model_type = 'gpt2'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer_hf = GPT2Tokenizer.from_pretrained(model_type)

In [3]:
# 调用minGPT


# model_config.model_type = model_type
# model_config.vocab_size = train_dataset.get_vocab_size()
# model_config.block_size = train_dataset.get_block_size()
# model = GPT(model_config, types="nm")
# 如果要关掉记忆模块，令types=None即可
model, _ = GPT.from_pretrained(model_type)
model.to(device)

number of parameters: 124.44M


GPT(
  (neural_memory): NeuralMemory(
    (layers): ModuleList(
      (0): Sequential(
        (0): Linear(in_features=768, out_features=3072, bias=True)
        (1): SiLU()
      )
      (1): Sequential(
        (0): Linear(in_features=3072, out_features=768, bias=True)
        (1): SiLU()
      )
    )
    (K): Linear(in_features=768, out_features=768, bias=False)
    (V): Linear(in_features=768, out_features=768, bias=False)
    (Q): Linear(in_features=768, out_features=768, bias=False)
    (silu): SiLU()
  )
  (dc_gate): Sequential(
    (0): Linear(in_features=768, out_features=3072, bias=True)
    (1): SiLU()
    (2): Linear(in_features=3072, out_features=768, bias=True)
    (3): Sigmoid()
  )
  (nm_gate): Sequential(
    (0): Linear(in_features=768, out_features=3072, bias=True)
    (1): SiLU()
    (2): Linear(in_features=3072, out_features=768, bias=True)
    (3): Sigmoid()
  )
  (transformer): ModuleDict(
    (wte): Embedding(50257, 768)
    (wpe): Embedding(1024, 768)
    (dro

In [5]:
from dataclasses import asdict
from dynamic_cheatsheet import DynamicCheatsheetMemory
from dynamic_cheatsheet.config_loader import load_config

cfg = load_config("dynamic_cheatsheet/config.yaml")
dc_cfg = asdict(cfg.dc)

# hidden_dim 必须 = GPT 的 n_embd（gpt2 是 768）
hidden_dim = model.transformer.wpe.embedding_dim
dynamic_cheatsheet = DynamicCheatsheetMemory(dc_cfg, hidden_dim)
print(hidden_dim, dynamic_cheatsheet.dc_len, dynamic_cheatsheet.embed_dim)

768 8 1024


In [None]:
# # 最简单 batch_size = 1
# prompt = "Game of 24: numbers are 3, 3, 8, 8. Output ONE expression equals 24."

prompts = [
    "Game of 24: numbers are 3, 3, 8, 8. Output ONE expression equals 24.",
    "Game of 24: numbers are 1, 5, 5, 5. Output ONE expression equals 24.",
    "Game of 24: numbers are 4, 4, 10, 10. Output ONE expression equals 24.",
]

encoded = tokenizer_hf(prompts, return_tensors='pt')
idx = encoded['input_ids'].to(device) # (B,T) LongTensor
B, T = idx.shape

query_texts = [prompts]  # batch_size = 1
dc_memory, dbg = dynamic_cheatsheet.retrieve(query_texts, batch_size=B, device=device)

# targets：自回归 LM loss（版本 A：不手动 shift，交给 model 内部）
targets = idx.clone()
targets[:, 0] = -1  # 可选：不算第一个 token 的 loss（看你 loss 是否 ignore_index=-1）
# target 也来自 dataset，根据dataset的定义界定是否需要像idx一样处理

# logits, loss = model(idx, targets=targets, dc_memory=dc_memory)
# # 测试dc是否真的起作用
# dc_zero = torch.zeros_like(dc_memory)
# logits0, loss0 = model(idx, targets=targets, dc_memory=dc_zero)
# print("loss with dc:", loss.item())
# print("loss w/ zero:", loss0.item())

# 4) forward（⚠️ 前提：你的 model.forward 接受 dc_memory）
logits, loss = model(idx, targets=targets, dc_memory=dc_memory)
print("logits:", logits.shape, "loss:", loss.item() if loss is not None else None)

loss with dc: 7.205523490905762
loss w/ zero: 7.8425421714782715


In [None]:
# 开始训练
from minGPT.mingpt.trainer import Trainer
train_config = Trainer.get_default_config()
train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 2000
train_config.num_workers = 0
trainer = Trainer(train_config, model, train_dataset)
# train dataset的定义参看 minGPT/mingpt/train.py要求的输入

In [None]:
def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)

In [None]:
# 冻结部分参数
for param in model.parameters():
    param.requires_grad = False
if getattr(model, "neural_memory", None) is not None:
    for p in model.neural_memory.parameters():
        p.requires_grad = True
for block in model.transformer.h:
    if hasattr(block, "dc_gate"):
        for p in block.dc_gate.parameters():
            p.requires_grad = True
    if hasattr(block, "gate"):
        for p in block.gate.parameters():
            p.requires_grad = True

trainer.run(dc_memory = None)
# 这里的dc_memory为None，在gpt模型中会变成全0张量

In [None]:
model.eval()

In [None]:
# 调用生成

prompt = "Once upon a time"
dc_memory = dynamic_cheatsheet.retrieve(prompt, batchsize=1, device=device)
# 这里的batchsize需要在源代码中修改，对齐维度
encoded = tokenizer_hf(prompt, return_tensors='pt')
idx2 = encoded['input_ids'].to(device) # (1,T) LongTensor
target = ()
# target 来自 dataset，根据dataset的定义界定是否需要像idx一样处理

with torch.no_grad():
    cat = model.generate(idx2, n, do_sample=False, dc_memory=dc_memory)[0]
    out = tokenizer_hf.decode(cat.cpu().squeeze())
    # 注意参数对齐
dynamic_cheatsheet.update(out, device=device, max_entries=100)
# 这里的max_entries和谁对齐？


In [None]:
# 可以生成评估函数
def evaluate_model(model, eval_dataset, dc_memory=None):
    