In [19]:
import torch
import torch.nn as nn
from torch.nn import functional as F
# torch.cuda.empty_cache()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
block_size = 64
batch_size = 128
max_iters = 3000
# eval_interval = 2500
learning_rate = 3e-4
eval_iters=100
n_embd = 384 # 嵌入维度
n_head = 8
n_layer = 8 # transformer block 的层数
dropout = 0.2


cuda


In [20]:
chars = ""
with open('wizard_of_oz.txt','r',encoding='utf-8') as f:
    text = f.read()
    chars = sorted(set(text))
# print(len(text))
# print(text[:200])
# print(chars)
# print(len(chars))
vocab_size = len(chars)

In [21]:
string_to_int = { ch:i for i,ch in enumerate(chars) }
int_to_string = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

# print(encode('hello'))
# encodeed_hello = encode('hello')
# decodeed_hello = decode(encodeed_hello)
# print(decodeed_hello)
# 此处使用的是单个字符的token词表，这会使得vocabulary的size较小，当然也会导致整个文本需要划分的encode和decode变得很大。如果使用子词token词表那么恰好相反。

data = torch.tensor(encode(text),dtype=torch.long)
print(data[:100])

tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49,  1, 25, 38, 28,  1, 44, 32, 29,
         1, 47, 33, 50, 25, 42, 28,  1, 33, 38,  1, 39, 50,  0,  0,  1,  1, 26,
        49,  0,  0,  1,  1, 36, 11,  1, 30, 42, 25, 38, 35,  1, 26, 25, 45, 37,
         0,  0,  1,  1, 25, 45, 44, 32, 39, 42,  1, 39, 30,  1, 44, 32, 29,  1,
        47, 33, 50, 25, 42, 28,  1, 39, 30,  1, 39, 50,  9,  1, 44, 32, 29,  1,
        36, 25, 38, 28,  1, 39, 30,  1, 39, 50])


In [22]:
n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]

def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size,(batch_size,)) #randint(low,high,(size,size,...)),注意当只有一个整数时默认为上界。
    # print(ix)
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x,y = x.to(device), y.to(device)
    return x,y
# x,y = get_batch('train')
# print('inputs:')
# print(x)
# print('targets:')
# print(y)

In [23]:
# block_size = 8

x = train_data[:block_size]
y = train_data[1:block_size+1]

for t in range(block_size):
    context = x[:t+1]   #x[0]~x[t]不包括x[t+1]!
    target = y[t]
    print('when input is',context,'target is',target)
    

when input is tensor([80]) target is tensor(1)
when input is tensor([80,  1]) target is tensor(1)
when input is tensor([80,  1,  1]) target is tensor(28)
when input is tensor([80,  1,  1, 28]) target is tensor(39)
when input is tensor([80,  1,  1, 28, 39]) target is tensor(42)
when input is tensor([80,  1,  1, 28, 39, 42]) target is tensor(39)
when input is tensor([80,  1,  1, 28, 39, 42, 39]) target is tensor(44)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44]) target is tensor(32)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32]) target is tensor(49)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49]) target is tensor(1)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49,  1]) target is tensor(25)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49,  1, 25]) target is tensor(38)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49,  1, 25, 38]) target is tensor(28)
when input is tensor([80,  1,  1, 28, 39, 42, 39, 44, 32, 49,  1, 2

In [24]:
@torch.no_grad()
def estimate_loss():
    out = {}  # 用于存储训练集和验证集的平均损失
    model.eval()  # 将模型设置为评估模式
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)  # 用于存储每次评估迭代的损失
        for k in range(eval_iters):
            X, Y = get_batch(split)  # 获取当前批次的数据和标签
            logits, loss = model(X, Y)  # 前向传播计算输出和损失
            losses[k] = loss.item()  # 记录损失值，item()将张量tensor转化为一个python数值。
        out[split] = losses.mean()  # 计算平均损失
    model.train()  # 恢复模型为训练模式
    return out  # 返回训练集和验证集的平均损失

In [25]:
class Head(nn.Module):

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
    
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape  # (B, T, C)
        k = self.key(x)  # (B, T, head_size)
        q = self.query(x)  # (B, T, head_size)

        # 注意力权重 (B, T, head_size) @ (B, head_size, T) -> (B, T, T)
        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5
        
        # 屏蔽上三角部分
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        
        # 归一化权重
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)

        v = self.value(x)  # (B, T, head_size)
        
        # 应用权重到值上 (B, T, T) @ (B, T, head_size) -> (B, T, head_size)
        out = wei @ v
        return out
    

class MultiHeadAttention(nn.Module):

    def __init__(self, n_head, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
        self.proj = nn.Linear(head_size * n_head, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 并行计算多头注意力并拼接 (B, T, head_size * n_head)
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        # 线性变换和 dropout
        out = self.dropout(self.proj(out))
        return out

class FeedFoward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):

    def __init__(self, n_embd, n_head):

        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        # 应用多头注意力和层归一化
        y = self.sa(x)
        x = self.ln1(x + y) #REs有个残差相加
        # 应用前馈网络和层归一化
        y = self.ffwd(x)
        x = self.ln2(x + y) #REs有个残差相加
        return x
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
    
        self.ln_f = nn.LayerNorm(n_embd) #层规范化，有点意思，似乎是由于size不统一造成只能使用层归一化？
        self.lm_head = nn.Linear(n_embd, vocab_size)#Transformer 模型中的输入序列长度和批量大小可能变化较大。LayerNorm 不依赖于批量大小，这使得模型在不同批量大小下都能表现稳定。

        self.apply(self._init_weights)
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)


    
    def forward(self, index, targets=None):
        # """
        # 前向传播函数

        # 参数:
        # - index: 输入的索引张量，形状为 (B, T)
        # - targets: 目标张量，形状为 (B, T)

        # 返回:
        # - logits: 预测的logits，形状为 (B*T, C)
        # - loss: 计算的交叉熵损失
        # """
        B, T = index.shape
        # logits = self.token_embedding_table(index)
        tok_emb = self.token_embedding_table(index)# (B, T, n_embd)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T, n_embd)
        x = tok_emb + pos_emb# (B, T, n_embd)
        x = self.blocks(x)# (B, T, n_embd)
        x = self.ln_f(x)# (B, T, n_embd)
        
        logits = self.lm_head(x)
        
        if targets is None:
            loss=None
        else:    
        
            B, T, C = logits.shape
            logits = logits.view(B*T, C) # (B*T, C)
            targets = targets.view(B*T)# (B*T)
            loss = F.cross_entropy(logits, targets)
        
        return logits, loss

  
    def generate(self, index, max_new_tokens):
        """
        生成新的文本序列，基于当前输入的索引序列。
    
        参数:
        - index: 张量，形状为 (B, T)，表示输入序列的索引。
          - B: 批次大小
          - T: 当前序列长度
        - max_new_tokens: 整数，要生成的新标记的数量。
    
        返回:
        - 张量，形状为 (B, T + max_new_tokens)，生成的新序列的索引。
        """
        for _ in range(max_new_tokens):
            # 确保输入序列长度不超过 block_size，只取最后 block_size 个标记
            index_cond = index[:, -block_size:]  # (B, T) -> (B, min(T, block_size))
    
            # 前向传播，通过模型获取 logits 和 loss
            logits, loss = self.forward(index_cond)  # logits 形状为 (B, min(T, block_size), vocab_size)
    
            # 只关注最后一个时间步的 logits，用于预测下一个标记
            logits = logits[:, -1, :]  # 从 (B, min(T, block_size), vocab_size) -> (B, vocab_size)
    
            # 计算概率分布
            probs = F.softmax(logits, dim=-1)  # 应用 softmax 函数后形状保持为 (B, vocab_size)
    
            # 从概率分布中采样，获取下一个标记的索引
            index_next = torch.multinomial(probs, num_samples=1)  # 从 (B, vocab_size) -> (B, 1)
    
            # 将新生成的标记连接到输入序列的末尾
            index = torch.cat((index, index_next), dim=1)  # 从 (B, T) + (B, 1) -> (B, T+1)
    
        return index  # 返回形状为 (B, T + max_new_tokens) 的生成序列


model = GPTLanguageModel(vocab_size)
m = model.to(device)

context = torch.zeros((1,1), dtype=torch.long, device=device)
generated_chars = decode(m.generate(context, max_new_tokens=500)[0].tolist())
print(generated_chars)


3Z-Ck89gwF.m3 02B
;1XDahWb]kBkhciMne:i7lQ'.[:9;uy]C-]l8﻿*1
WcMnlW8Icmg'JJVVv5lDzcw4sl4kE,Ku5wP]znzbilf
[t&ZkS﻿-Pvb.fH&PYi]-E;e_'1:R4ack_zk4?x!T__l*5' 9u'-5?YuYti' bp[l6XR4z.,LGWc,ZcVRVgda08wB:"_THTi(4rxUp(YXX*;r0Ye(6Gdc1mHz*ptpO4P0F46&M'yaiV
-kIZzN'UI4z1g"AI15uZjJXI
]?wU5!GC4BV-GkTnDrlk[j3_2v FG﻿4Es?4-k ZSFC"1L&t3DjZ.aBTVNTRhacy[i7,h1c'&cm)KRB?R?mTa!u﻿TUk5w)VeB?2"KRR?Xo!0-2js!x3]ARXLh*&HpuNmbJWYyGLd41?3Z8p-1'1RF?YKGFTo6?ZJ0jllp!;Ash?8aooMaT&,G*1Lii8gdIpL"yO0nqk8z2-fO2p0joFh;1mzB2TL7
wX(V-
Hh47)D


In [26]:
#创建一个 Pytorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):
    if iter % eval_iters == 0:
        losses = estimate_loss()
        print(f"step: {iter}, train loss: {losses['train']:.4f}, val loss:{losses['val']:.4f}")
    
    xb, yb = get_batch('train')

    logits, loss = model.forward(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print(loss.item())

step: 0, train loss: 4.4644, val loss:4.4594
step: 100, train loss: 2.3119, val loss:2.3815
step: 200, train loss: 1.8331, val loss:1.9606
step: 300, train loss: 1.5989, val loss:1.7571
step: 400, train loss: 1.4594, val loss:1.6573
step: 500, train loss: 1.3607, val loss:1.5988


KeyboardInterrupt: 

In [27]:
context = torch.zeros((1,1), dtype=torch.long, device=device)
generated_chars = decode(m.generate(context, max_new_tokens=500)[0].tolist())
print(generated_chars)


"Oh, but much we cave?"

"What wond the boy his piglet. Oh, a spreaded some each is easure
much. The a hure
gain the piriins catures so their way would buOd boding driss.

"It all saw there as of the longers our great at, how to uil sure is in
years inton of fir that of the uroom moust did to be two the parple again
as mouc, and and and dears with could be lind the Valley Pract hiles
hem upon the puble strong the lew pormpasons."

"Lal!" anther
Doroishy. The pous Jam loome my theird wings, as a 
