# Homework1

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.attention import SDPBackend, sdpa_kernel

# 模型核心组件
class NanoGPT(nn.Module):
    def __init__(self, vocab_size, n_embd=64, n_head=4, n_layer=4, block_size=128, dropout=0.1):
        super().__init__()
        self.block_size = block_size
        self.token_embedding = nn.Embedding(vocab_size, n_embd)
        self.position_embedding = nn.Embedding(block_size, n_embd)
        
        # 使用FlashAttention的Transformer块
        self.blocks = nn.Sequential(*[
            nn.Sequential(
                nn.LayerNorm(n_embd),
                MultiHeadAttention(n_head, n_embd, dropout),
                nn.LayerNorm(n_embd),
                nn.Sequential(
                    nn.Linear(n_embd, 4 * n_embd),
                    nn.GELU(),
                    nn.Linear(4 * n_embd, n_embd),
                    nn.Dropout(dropout)
                )
            ) for _ in range(n_layer)
        ])
        
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        assert T <= self.block_size, "输入序列过长"
        
        tok_emb = self.token_embedding(idx)
        pos_emb = self.position_embedding(torch.arange(T, device=idx.device))
        x = tok_emb + pos_emb
        
        for block in self.blocks:
            x = x + block[1](block[0](x))  # 残差连接
            x = x + block[3](block[2](x))  # 残差连接
        
        x = self.ln_f(x)
        logits = self.lm_head(x)
        
        if targets is None:
            return logits, None
            
        loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
        return logits, loss

    def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

# FlashAttention优化的多头注意力
class MultiHeadAttention(nn.Module):
    def __init__(self, n_head, n_embd, dropout):
        super().__init__()
        self.n_head = n_head
        self.head_size = n_embd // n_head
        self.key = nn.Linear(n_embd, n_embd)
        self.query = nn.Linear(n_embd, n_embd)
        self.value = nn.Linear(n_embd, n_embd)
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        
        # 使用FlashAttention
        q = self.query(x).view(B, T, self.n_head, self.head_size).transpose(1, 2)
        k = self.key(x).view(B, T, self.n_head, self.head_size).transpose(1, 2)
        v = self.value(x).view(B, T, self.n_head, self.head_size).transpose(1, 2)
        
        with sdpa_kernel(backends=SDPBackend.FLASH_ATTENTION):
            attn_output = F.scaled_dot_product_attention(q, k, v, is_causal=True)
        
        attn_output = attn_output.transpose(1, 2).contiguous().view(B, T, C)
        return self.proj(attn_output)

# 训练函数
def train_model(text_path, model_path="nano_gpt.pt", max_iters=5000):
    # 加载文本数据
    with open(text_path, 'r', encoding='utf-8') as f:
        text = f.read()
    
    # 创建词汇表
    chars = sorted(set(text))
    vocab_size = len(chars)
    stoi = {ch: i for i, ch in enumerate(chars)}
    itos = {i: ch for i, ch in enumerate(chars)}
    encode = lambda s: [stoi[c] for c in s]
    decode = lambda l: ''.join([itos[i] for i in l])
    
    # 准备训练数据
    data = torch.tensor(encode(text), dtype=torch.long)
    n = int(0.9 * len(data))
    train_data, val_data = data[:n], data[n:]
    
    # 初始化模型
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = NanoGPT(vocab_size).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
    
    # 训练循环
    for iter in range(max_iters):
        # 创建训练批次
        idx = torch.randint(len(train_data) - model.block_size, (32,))
        inputs = torch.stack([train_data[i:i+model.block_size] for i in idx]).to(device)
        targets = torch.stack([train_data[i+1:i+model.block_size+1] for i in idx]).to(device)
        
        # 前向传播和优化
        _, loss = model(inputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if iter % 500 == 0:
            print(f"Iter {iter}/{max_iters} | Loss: {loss.item():.4f}")
    
    torch.save(model.state_dict(), model_path)
    context = torch.zeros((1, 1), dtype=torch.long, device=device)
    print("生成示例:", decode(model.generate(context, 100)[0].tolist()))


In [None]:
train_model("test_fiction.txt")

Iter 0/5000 | Loss: 8.1735
Iter 500/5000 | Loss: 4.3824
Iter 1000/5000 | Loss: 4.0810
Iter 1500/5000 | Loss: 3.8805
Iter 2000/5000 | Loss: 3.6498
Iter 2500/5000 | Loss: 3.4384
Iter 3000/5000 | Loss: 3.2085
Iter 3500/5000 | Loss: 3.2162
Iter 4000/5000 | Loss: 3.0557
Iter 4500/5000 | Loss: 3.1253


# Homework2

In [1]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

# 加载小模型到CPU
model = AutoModelForCausalLM.from_pretrained("bigscience/bloom-560m", device_map="cpu")

# 应用量化（8位整数量化）
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8
)


# 加载tokenizer
tokenizer = AutoTokenizer.from_pretrained("bigscience/bloom-560m")

# 生成文本
input_data = tokenizer('用通俗易懂的语言介绍一下NLP算法领域中的量化概念：', return_tensors='pt')
predict = quantized_model.generate(**input_data, max_new_tokens=50)
print(tokenizer.decode(predict[0], skip_special_tokens=True))


  from .autonotebook import tqdm as notebook_tqdm
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


MemoryError: bad allocation