In [1]:
import torch.nn as nn
import torch.nn.functional as F
import torch
class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, n_heads):
        super().__init__()
        self.attn = nn.MultiheadAttention(emb_dim, n_heads, batch_first=True)
        self.norm1 = nn.LayerNorm(emb_dim)
        self.mlp = nn.Sequential(
            nn.Linear(emb_dim, 4 * emb_dim),
            nn.GELU(),
            nn.Linear(4 * emb_dim, emb_dim)
        )
        self.norm2 = nn.LayerNorm(emb_dim)

    def forward(self, x):
        B, T, C = x.size()

        # 生成 causal mask，保证第 t 个位置只能看到 <= t 的位置
        mask = torch.tril(torch.ones(T, T, device=x.device)).unsqueeze(0).repeat(B, 1, 1)
        # nn.MultiheadAttention 需要 bool mask，True 表示被遮挡
        attn_mask = ~mask.bool()[0]  # (T, T) bool，True 表示遮挡

        attn_out, _ = self.attn(x, x, x, attn_mask=attn_mask, need_weights=False)
        x = x + attn_out
        x = self.norm1(x)
        mlp_out = self.mlp(x)
        x = x + mlp_out
        return self.norm2(x)

class TinyTransformer(nn.Module):
    def __init__(self, vocab_size, emb_dim=512, n_heads=16, n_layers=12, block_size=1024):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, emb_dim)
        self.pos_embedding = nn.Parameter(torch.randn(1, block_size, emb_dim))
        self.blocks = nn.Sequential(*[
            TransformerBlock(emb_dim, n_heads) for _ in range(n_layers)
        ])
        self.ln = nn.LayerNorm(emb_dim)
        self.fc = nn.Linear(emb_dim, vocab_size)

    def forward(self, x):
        tok_emb = self.token_embedding(x)
        x = tok_emb + self.pos_embedding[:, :x.size(1), :]
        x = self.blocks(x)
        x = self.ln(x)
        logits = self.fc(x)
        return logits

In [2]:
# fine_tune_alpaca.ipynb

#  1. 导入库
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
# from transformers import AutoTokenizer  # 可选，若你用 huggingface tokenizer
import json
from tqdm import tqdm
import os
save_dir = "finetune"
os.makedirs(save_dir, exist_ok=True)

# 2. 加载你的 TinyTransformer 模型结构（假设你保存在 tiny_model.pt）
# from your_model import TinyTransformer  # 替换为你的模型定义路径

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TinyTransformer(vocab_size=8192).to(device)
# 加载模型 checkpoint
checkpoint = torch.load("tiny_transformer_best.pth", map_location=device)
state_dict = checkpoint["model_state_dict"]

# 判断是否是 DataParallel 保存的模型（带 "module." 前缀）
if any(k.startswith("module.") for k in state_dict.keys()):
    # 去除 "module." 前缀
    state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()}

# 加载权重到模型
model.load_state_dict(state_dict)


#  3. 冻结除最后两层 block 以外的所有参数
for name, param in model.named_parameters():
    if "blocks.10" in name or "blocks.11" in name or "ln" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

# 4. 加载 tokenizer
from tokenizers import Tokenizer
tokenizer = Tokenizer.from_file("./data/tiny_tokenizer.json")

#  5. 加载 Alpaca 数据集
alpaca_data = []
with open("./data/alpaca-1k.json", "r", encoding="utf-8") as f:
    for line in f:
        alpaca_data.append(json.loads(line))

class AlpacaDataset(Dataset):
    def __init__(self, data, tokenizer, block_size=1024):
        self.samples = []
        self.tokenizer = tokenizer
        for example in data:
            prompt = f"Instruction:\n{example['instruction']}\nInput:\n{example['input']}\nOutput:\n{example['output']}"
            ids = tokenizer.encode(prompt).ids
            assert isinstance(ids, list) and isinstance(ids[0], int), "Tokenizer output format error"
            if len(ids) < block_size:
                ids += [0] * (block_size - len(ids))  # padding
            else:
                ids = ids[:block_size]
            self.samples.append(torch.tensor(ids))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        x = self.samples[idx][:-1]
        y = self.samples[idx][1:]
        return x, y

dataset = AlpacaDataset(alpaca_data, tokenizer)
loader = DataLoader(dataset, batch_size=32, shuffle=True)

# 6. 训练准备
optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=5e-5)
pad_token_id = tokenizer.token_to_id("<pad>")
loss_fn = nn.CrossEntropyLoss(ignore_index=pad_token_id)

# 7. 微调训练
model.train()
for epoch in range(3):
    total_loss = 0
    for x, y in tqdm(loader):
        x, y = x.to(device), y.to(device)
        logits = model(x)
        B, T, V = logits.shape
        loss = loss_fn(logits.view(B*T, V), y.view(B*T))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {total_loss / len(loader):.4f}")

# 8. 保存微调后的模型

save_path=os.path.join(save_dir, "tiny_model_finetuned_alpaca.pt")
torch.save(model.state_dict(),save_path)


100%|██████████| 1618/1618 [10:27<00:00,  2.58it/s]


Epoch 1 Loss: 5.6479


100%|██████████| 1618/1618 [10:27<00:00,  2.58it/s]


Epoch 2 Loss: 4.9396


100%|██████████| 1618/1618 [10:27<00:00,  2.58it/s]


Epoch 3 Loss: 4.6004


In [3]:
def generate_text(prompt, model, tokenizer, max_new_tokens=100, device="cuda"):
    from torch.nn import functional as F

    input_ids = tokenizer.encode(prompt).ids
    input_ids = torch.tensor([input_ids], dtype=torch.long).to(device)

    eos_token_id = tokenizer.token_to_id("<|endoftext|>")  # 终止符
    # print(eos_token_id)
    model.eval()
    for _ in range(max_new_tokens):
        if input_ids.size(1) > model.pos_embedding.size(1):
            input_ids = input_ids[:, -model.pos_embedding.size(1):]  # 截断上下文

        with torch.no_grad():
            logits = model(input_ids)
            next_token_logits = logits[:, -1, :]
            probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            # print(next_token.tolist)
            # print(f"Next token text: {tokenizer.decode([next_token.item()])}")
            # print(f"Next token logits: {next_token_logits}")
            # print(f"Top 5 probs: {torch.topk(probs, 5)}")

        # 拼接生成的新 token
        input_ids = torch.cat([input_ids, next_token], dim=1)

        # 检查是否生成了 <eos>
        if next_token.item() == eos_token_id:
            print("here")
            break

    output_ids = input_ids[0].tolist()
    return tokenizer.decode(output_ids)


In [4]:
import random
import csv

# 假设 alpaca_data 是完整的数据列表
sample_50 = random.sample(alpaca_data, 50)

# 用于保存最终写入 CSV 的数据
csv_rows = []

for i, sample in enumerate(sample_50):
    prompt = f"Instruction: {sample['instruction']}\nInput: {sample['input']}\nResponse:"
    response = generate_text(prompt, model, tokenizer)  # 你的生成函数
    
    csv_rows.append({
        "index": i + 1,
        "instruction": sample["instruction"],
        "input": sample["input"],
        "reference_output": sample["output"],
        "model_response": response,
        "accurate": ""  # 留空，人工填写 y/n
    })

# 写入 CSV 文件
csv_file = "model_eval_samples.csv"
with open(csv_file, mode="w", newline='', encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=["index", "instruction", "input", "reference_output", "model_response", "accurate"])
    writer.writeheader()
    writer.writerows(csv_rows)

print(f"Saved 50 evaluation samples to '{csv_file}'. You can now open and annotate it (accurate: y/n).")


  return torch._native_multi_head_attention(


Saved 50 evaluation samples to 'model_eval_samples.csv'. You can now open and annotate it (accurate: y/n).


In [5]:
import csv

accurate_count = 0
total = 0

with open("model_eval_samples.csv", mode="r", encoding="utf-8") as file:
    reader = csv.DictReader(file)
    for row in reader:
        if row["accurate"].strip().lower() == "y":
            accurate_count += 1
        total += 1

accuracy = accurate_count / total * 100
print(f"Accurate responses: {accurate_count}/{total}")
print(f"Accuracy: {accuracy:.2f}%")

if accuracy > 60:
    print("Target met (accuracy > 60%)")
else:
    print("Target not met (accuracy ≤ 60%)")


Accurate responses: 0/50
Accuracy: 0.00%
Target not met (accuracy ≤ 60%)


In [None]:
prompt = "Hello, how are you?"
response = generate_text(prompt, model, tokenizer)  
print(response)


 Hello, how are you? a customer interactions? an opportunity to store?comtes wear thank a number.
Input:

Output:
H diseases and I made someone who is it? capable ben purchg is a tim of the text in burization and email. for maintaining a diet and work, making it can do not your needs to minimize outdoed date to carefully or simply of which only detail feedback behind time. Shideingenanceizekes, prepand and is have a
