<a href="https://colab.research.google.com/github/EdwinZhou/LLMs-from-scratch/blob/main/LLMFromScratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 加载 the-verdict

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
file_path = "the-verdict.txt"
with open(file_path, "r", encoding="utf-8") as file:
  text_data = file.read()

FileNotFoundError: [Errno 2] No such file or directory: 'the-verdict.txt'

In [None]:
print(text_data)

# 完整的大模型

## 多头注意力

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_in, d_out,
    context_length, dropout, num_heads, qkv_bias=False):
    super().__init__()
    assert d_out % num_heads == 0, "d_out must be divisible by num_heads"
    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = d_out // num_heads #A
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.out_proj = nn.Linear(d_out, d_out) #B
    self.dropout = nn.Dropout(dropout)
    self.register_buffer(
    'mask',
    torch.triu(torch.ones(context_length, context_length), diagonal=1)
    )

  def forward(self, x):
    b, num_tokens, d_in = x.shape
    keys = self.W_key(x) #C
    queries = self.W_query(x) #C
    values = self.W_value(x) #C
    keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) #D
    values = values.view(b, num_tokens, self.num_heads, self.head_dim) #D
    queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)#D
    keys = keys.transpose(1, 2) #E
    queries = queries.transpose(1, 2) #E
    values = values.transpose(1, 2) #E
    attn_scores = queries @ keys.transpose(2, 3) #F
    mask_bool = self.mask.bool()[:num_tokens, :num_tokens] #G
    attn_scores.masked_fill_(mask_bool, -torch.inf) #H
    attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5, dim=-1)
    attn_weights = self.dropout(attn_weights)
    context_vec = (attn_weights @ values).transpose(1, 2) #I
    #J
    context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
    context_vec = self.out_proj(context_vec) #K
    return context_vec

## transformer 块

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.att = MultiHeadAttention(
      d_in=cfg["emb_dim"],
      d_out=cfg["emb_dim"],
      context_length=cfg["context_length"],
      num_heads=cfg["n_heads"],
      dropout=cfg["drop_rate"],
      qkv_bias=cfg["qkv_bias"])
    self.ff = FeedForward(cfg)
    self.norm1 = LayerNorm(cfg["emb_dim"])
    self.norm2 = LayerNorm(cfg["emb_dim"])
    self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

  def forward(self, x): #A
    shortcut = x
    x = self.norm1(x)
    x = self.att(x)
    x = self.drop_shortcut(x)
    x = x + shortcut # Add the original input back
    shortcut = x #B
    x = self.norm2(x)
    x = self.ff(x)
    x = self.drop_shortcut(x)
    x = x + shortcut #C
    return x

In [None]:
class GPTModel(nn.Module):
  def __init__(self, cfg):
    super().__init__()

    # 定义词嵌入层（token embedding）
    self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
    # 定义位置嵌入层（positional embedding）：
    self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
    # 定义嵌入层 dropout，在嵌入层后添加 dropout，防止模型过度依赖某些嵌入向量，减少过拟合。
    self.drop_emb = nn.Dropout(cfg["drop_rate"])

    self.trf_blocks = nn.Sequential(
        *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )

    self.final_norm = LayerNorm(cfg["emb_dim"])

    self.out_head = nn.Linear(
      cfg["emb_dim"],
      cfg["vocab_size"],
      bias=False
    )

  def forward(self, in_idx):
    batch_size, seq_len = in_idx.shape
    tok_embeds = self.tok_emb(in_idx)
    #A
    pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))

    x = tok_embeds + pos_embeds
    x = self.drop_emb(x)
    x = self.trf_blocks(x)
    x = self.final_norm(x)
    logits = self.out_head(x)
    return logits


In [None]:
GPT_CONFIG_124M = {
  "vocab_size": 50257,
  "context_length": 256, #A
  "emb_dim": 768,
  "n_heads": 12,
  "n_layers": 12,
  "drop_rate": 0.1, #B
  "qkv_bias": False
}
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval()

NameError: name 'GPTModel' is not defined

In [None]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

class GELU(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self, x):
    return 0.5 * x * (1 + torch.tanh(
      torch.sqrt(torch.tensor(2.0 / torch.pi)) *
      (x + 0.044715 * torch.pow(x, 3))
    ))

gelu, relu = GELU(), nn.ReLU()
x = torch.linspace(-3, 3, 100) #A
y_gelu, y_relu = gelu(x), relu(x)


class FeedForward(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.layers = nn.Sequential(
      nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
      GELU(),
      nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
    )

  def forward(self, x):
    return self.layers(x)

ffn = FeedForward(GPT_CONFIG_124M)
x = torch.rand(2, 3, 768) #A
out = ffn(x)
print(out.shape)


def print_gradients(model, x):
  # Forward pass
  output = model(x)
  target = torch.tensor([[0.]])
  # Calculate loss based on how close the target
  # and output are
  loss = nn.MSELoss()
  loss = loss(output, target)
  # Backward pass to calculate the gradients
  loss.backward()
  for name, param in model.named_parameters():
    if 'weight' in name:
      # Print the mean absolute gradient of the weights
      print(f"{name} has gradient mean of {param.grad.abs().mean().item()}")

class ExampleDeepNeuralNetwork(nn.Module):
  def __init__(self, layer_sizes, use_shortcut):
    super().__init__()
    self.use_shortcut = use_shortcut
    self.layers = nn.ModuleList([
      # Implement 5 layers
      nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]), GELU())
    ])

  def forward(self, x):
    for layer in self.layers:
    # Compute the output of the current layer
      layer_output = layer(x)
    # Check if shortcut can be applied
      if self.use_shortcut and x.shape == layer_output.shape:
        x = x + layer_output
      else:
        x = layer_output
    return x


layer_sizes = [3, 3, 3, 3, 3, 1]
sample_input = torch.tensor([[1., 0., -1.]])
torch.manual_seed(123) # specify random seed for the initial weights for reproducibility
model_without_shortcut = ExampleDeepNeuralNetwork(
layer_sizes, use_shortcut=False
)

print_gradients(model_without_shortcut, sample_input)

torch.manual_seed(123)
model_with_shortcut = ExampleDeepNeuralNetwork(
layer_sizes, use_shortcut=True
)
print_gradients(model_with_shortcut, sample_input)

# plt.figure(figsize=(8, 3))
# for i, (y, label) in enumerate(zip([y_gelu, y_relu], ["GELU", "ReLU"]), 1):
#   plt.subplot(1, 2, i)
#   plt.plot(x, y)
#   plt.title(f"{label} activation function")
#   plt.xlabel("x")
#   plt.ylabel(f"{label}(x)")
#   plt.grid(True)
#   plt.tight_layout()
#   plt.show()

torch.Size([2, 3, 768])
layers.0.0.weight has gradient mean of 0.00020173584925942123
layers.1.0.weight has gradient mean of 0.00012011159560643137
layers.2.0.weight has gradient mean of 0.0007152040489017963
layers.3.0.weight has gradient mean of 0.0013988736318424344
layers.4.0.weight has gradient mean of 0.005049645435065031
layers.0.0.weight has gradient mean of 0.22169791162014008
layers.1.0.weight has gradient mean of 0.20694105327129364
layers.2.0.weight has gradient mean of 0.32896995544433594
layers.3.0.weight has gradient mean of 0.2665732204914093
layers.4.0.weight has gradient mean of 1.3258540630340576


In [None]:
import torch
import torch.nn as nn

class MultiHeadAttention(nn.Module):
  def __init__(self, d_in, d_out,
    context_length, dropout, num_heads, qkv_bias=False):
    super().__init__()
    assert d_out % num_heads == 0, "d_out must be divisible by num_heads"
    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = d_out // num_heads #A
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.out_proj = nn.Linear(d_out, d_out) #B
    self.dropout = nn.Dropout(dropout)
    self.register_buffer(
    'mask',
    torch.triu(torch.ones(context_length, context_length), diagonal=1)
    )

  def forward(self, x):
    b, num_tokens, d_in = x.shape
    keys = self.W_key(x) #C
    queries = self.W_query(x) #C
    values = self.W_value(x) #C
    keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) #D
    values = values.view(b, num_tokens, self.num_heads, self.head_dim) #D
    queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)#D
    keys = keys.transpose(1, 2) #E
    queries = queries.transpose(1, 2) #E
    values = values.transpose(1, 2) #E
    attn_scores = queries @ keys.transpose(2, 3) #F
    mask_bool = self.mask.bool()[:num_tokens, :num_tokens] #G
    attn_scores.masked_fill_(mask_bool, -torch.inf) #H
    attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5, dim=-1)
    attn_weights = self.dropout(attn_weights)
    context_vec = (attn_weights @ values).transpose(1, 2) #I
    #J
    context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
    context_vec = self.out_proj(context_vec) #K
    return context_vec

torch.manual_seed(123)
batch_size, context_length, d_in = batch.shape
d_out = 2
mha = MultiHeadAttention(d_in, d_out, context_length, 0.0, num_heads=2)
context_vecs = mha(batch)
print(context_vecs)
print("context_vecs.shape:", context_vecs.shape)

NameError: name 'batch' is not defined

第四章


In [None]:
import torch
import torch.nn as nn
import tiktoken

GPT_CONFIG_124M = {
  "vocab_size": 50257, # Vocabulary size
  "context_length": 1024, # Context length
  "emb_dim": 768, # Embedding dimension
  "n_heads": 12, # Number of attention heads
  "n_layers": 12, # Number of layers
  "drop_rate": 0.1, # Dropout rate
  "qkv_bias": False # Query-Key-Value bias
}


class DummyGPTModel(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
    self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
    self.drop_emb = nn.Dropout(cfg["drop_rate"])
    self.trf_blocks = nn.Sequential(
      *[DummyTransformerBlock(cfg) for _ in range(cfg["n_layers"])]) #A
    self.final_norm = DummyLayerNorm(cfg["emb_dim"]) #B
    self.out_head = nn.Linear(
      cfg["emb_dim"], cfg["vocab_size"], bias=False
    )

  def forward(self, in_idx):
    batch_size, seq_len = in_idx.shape
    tok_embeds = self.tok_emb(in_idx)
    pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
    x = tok_embeds + pos_embeds
    x = self.drop_emb(x)
    x = self.trf_blocks(x)
    x = self.final_norm(x)
    logits = self.out_head(x)
    return logits


class DummyTransformerBlock(nn.Module): #C
  def __init__(self, cfg):
    super().__init__()

  def forward(self, x): #D
    return x

class DummyLayerNorm(nn.Module): #E
  def __init__(self, normalized_shape, eps=1e-5): #F
    super().__init__()

  def forward(self, x):
    return x

class LayerNorm(nn.Module):
  def __init__(self, emb_dim):
    super().__init__()
    self.eps = 1e-5
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True)
    var = x.var(dim=-1, keepdim=True, unbiased=False)
    norm_x = (x - mean) / torch.sqrt(var + self.eps)
    return self.scale * norm_x + self.shift

tokenizer = tiktoken.get_encoding("gpt2")
batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)
# print(batch)

torch.manual_seed(123)
model = DummyGPTModel(GPT_CONFIG_124M)
logits = model(batch)
# print("Output shape:", logits.shape)
# print(logits)

# 模拟一层传播
torch.manual_seed(123)
batch_example = torch.randn(2, 5) #A

# layer = nn.Sequential(nn.Linear(5, 6), nn.ReLU())
# out = layer(batch_example)
# print(out)

# 计算均值和方差
# mean = out.mean(dim=-1, keepdim=False)
# var = out.var(dim=-1, keepdim=False)
# print("Mean:\n", mean)
# print("Variance:\n", var)

# 计算均值和方差，保留维度
# mean = out.mean(dim=-1, keepdim=True)
# var = out.var(dim=-1, keepdim=True)
# torch.set_printoptions(sci_mode=False)
# print("Mean:\n", mean)
# print("Variance:\n", var)

# 计算归一化之后的均值和方差
# out_norm = (out - mean) / torch.sqrt(var)
# mean = out_norm.mean(dim=-1, keepdim=True)
# var = out_norm.var(dim=-1, keepdim=True)
# print("Normalized layer outputs:\n", out_norm)
# print("Mean:\n", mean)
# print("Variance:\n", var)

# ln = LayerNorm(emb_dim=5)
# out_ln = ln(batch_example)
# mean = out_ln.mean(dim=-1, keepdim=True)
# var = out_ln.var(dim=-1, unbiased=False, keepdim=True)
# print("Mean:\n", mean)
# print("Variance:\n", var)

class GELU(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self, x):
    return 0.5 * x * (1 + torch.tanh(
          torch.sqrt(torch.tensor(2.0 / torch.pi)) *
          (x + 0.044715 * torch.pow(x, 3))
          ))

class FeedForward(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.layers = nn.Sequential(
      nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
      GELU(),
      nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
    )

  def forward(self, x):
    return self.layers(x)

# ffn = FeedForward(GPT_CONFIG_124M)
# x = torch.rand(2, 3, 768) #A
# out = ffn(x)
# print(out.shape)

# import matplotlib.pyplot as plt

# gelu, relu = GELU(), nn.ReLU()
# x = torch.linspace(-3, 3, 100) #A
# y_gelu, y_relu = gelu(x), relu(x)
# plt.figure(figsize=(8, 3))
# for i, (y, label) in enumerate(zip([y_gelu, y_relu], ["GELU", "ReLU"]), 1):
#   plt.subplot(1, 2, i)
#   plt.plot(x, y)
#   plt.title(f"{label} activation function")
#   plt.xlabel("x")
#   plt.ylabel(f"{label}(x)")
#   plt.grid(True)
#   plt.tight_layout()
#   plt.show()

class ExampleDeepNeuralNetwork(nn.Module):
  def __init__(self, layer_sizes, use_shortcut):
    super().__init__()
    self.use_shortcut = use_shortcut
    self.layers = nn.ModuleList([
      # Implement 5 layers
      nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]), GELU()),
      nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]), GELU())
    ])

  def forward(self, x):
    for layer in self.layers:
      # Compute the output of the current layer
      layer_output = layer(x)
      # Check if shortcut can be applied
      if self.use_shortcut and x.shape == layer_output.shape:
        x = x + layer_output
      else:
        x = layer_output

    return x

layer_sizes = [3, 3, 3, 3, 3, 1]
sample_input = torch.tensor([[1., 0., -1.]])
torch.manual_seed(123) # specify random seed for the initial weights for reproducibility
model_without_shortcut = ExampleDeepNeuralNetwork(
  layer_sizes, use_shortcut=True
)


def print_gradients(model, x):
  # Forward pass
  output = model(x)
  target = torch.tensor([[0.]])

  # Calculate loss based on how close the target
  # and output are
  loss = nn.MSELoss()
  loss = loss(output, target)

  # Backward pass to calculate the gradients
  loss.backward()

  for name, param in model.named_parameters():
    if 'weight' in name:
      # Print the mean absolute gradient of the weights
      print(f"{name} has gradient mean of {param.grad.abs().mean().item()}")

# print_gradients(model_without_shortcut, sample_input)

class MultiHeadAttention(nn.Module):
  def __init__(self, d_in, d_out,
    context_length, dropout, num_heads, qkv_bias=False):
    super().__init__()
    assert d_out % num_heads == 0, "d_out must be divisible by num_heads"
    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = d_out // num_heads #A
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.out_proj = nn.Linear(d_out, d_out) #B
    self.dropout = nn.Dropout(dropout)
    self.register_buffer(
    'mask',
    torch.triu(torch.ones(context_length, context_length), diagonal=1)
    )

  def forward(self, x):
    b, num_tokens, d_in = x.shape
    keys = self.W_key(x) #C
    queries = self.W_query(x) #C
    values = self.W_value(x) #C
    keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) #D
    values = values.view(b, num_tokens, self.num_heads, self.head_dim) #D
    queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)#D
    keys = keys.transpose(1, 2) #E
    queries = queries.transpose(1, 2) #E
    values = values.transpose(1, 2) #E
    attn_scores = queries @ keys.transpose(2, 3) #F
    mask_bool = self.mask.bool()[:num_tokens, :num_tokens] #G
    attn_scores.masked_fill_(mask_bool, -torch.inf) #H
    attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5, dim=-1)
    attn_weights = self.dropout(attn_weights)
    context_vec = (attn_weights @ values).transpose(1, 2) #I
    #J
    context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
    context_vec = self.out_proj(context_vec) #K
    return context_vec

class TransformerBlock(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.att = MultiHeadAttention(
      d_in=cfg["emb_dim"],
      d_out=cfg["emb_dim"],
      context_length=cfg["context_length"],
      num_heads=cfg["n_heads"],
      dropout=cfg["drop_rate"],
      qkv_bias=cfg["qkv_bias"])
    self.ff = FeedForward(cfg)
    self.norm1 = LayerNorm(cfg["emb_dim"])
    self.norm2 = LayerNorm(cfg["emb_dim"])
    self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

  def forward(self, x): #A
    shortcut = x
    x = self.norm1(x)
    x = self.att(x)
    x = self.drop_shortcut(x)
    x = x + shortcut # Add the original input back
    shortcut = x #B
    x = self.norm2(x)
    x = self.ff(x)
    x = self.drop_shortcut(x)
    x = x + shortcut #C
    return x


torch.manual_seed(123)
x = torch.rand(2, 4, 768) #A
block = TransformerBlock(GPT_CONFIG_124M)
output = block(x)
print("Input shape:", x.shape)
print("Output shape:", output.shape)

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768])


实现一个完整 GPT 模型

In [None]:


torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
out = model(batch)
# print("Input batch:\n", batch)
# print("\nOutput shape:", out.shape)
# print(out)

total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params:,}")

# print("Token embedding layer shape:", model.tok_emb.weight.shape)
# print("Output layer shape:", model.out_head.weight.shape)

total_params_gpt2 = total_params - sum(p.numel() for p in model.out_head.parameters())
print(f"Number of trainable parameters considering weight tying: {total_params_gpt2:,}")

total_size_bytes = total_params * 4 #A
total_size_mb = total_size_bytes / (1024 * 1024) #B
print(f"Total size of the model: {total_size_mb:.2f} MB")

def generate_text_simple(model, idx, max_new_tokens, context_size): #A
  for _ in range(max_new_tokens):
    idx_cond = idx[:, -context_size:] #B
    with torch.no_grad():
      logits = model(idx_cond)

    logits = logits[:, -1, :] #C
    probas = torch.softmax(logits, dim=-1) #D
    idx_next = torch.argmax(probas, dim=-1, keepdim=True) #E
    idx = torch.cat((idx, idx_next), dim=1) #F

  return idx

start_context = "Hello, I am"
encoded = tokenizer.encode(start_context)
print("encoded:", encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0) #A
print("encoded_tensor.shape:", encoded_tensor.shape)

model.eval() #A
out = generate_text_simple(
  model=model,
  idx=encoded_tensor,
  max_new_tokens=6,
  context_size=GPT_CONFIG_124M["context_length"]
)
print("Output:", out)
print("Output length:", len(out[0]))

decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print(decoded_text)

Total number of parameters: 163,009,536
Token embedding layer shape: torch.Size([50257, 768])
Output layer shape: torch.Size([50257, 768])
Number of trainable parameters considering weight tying: 124,412,160
Total size of the model: 621.83 MB
encoded: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])
Output: tensor([[15496,    11,   314,   716, 27018, 24086, 47843, 30961, 42348,  7267]])
Output length: 10
Hello, I am Featureiman Byeswickattribute argue


第五章开始



In [None]:
import torch
import torch.nn as nn
import tiktoken

# from chapter04 import GPTModel
GPT_CONFIG_124M = {
"vocab_size": 50257,
"context_length": 256, #A
"emb_dim": 768,
"n_heads": 12,
"n_layers": 12,
"drop_rate": 0.1, #B
"qkv_bias": False
}

class LayerNorm(nn.Module):
  def __init__(self, emb_dim):
    super().__init__()
    self.eps = 1e-5
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True)
    var = x.var(dim=-1, keepdim=True, unbiased=False)
    norm_x = (x - mean) / torch.sqrt(var + self.eps)
    return self.scale * norm_x + self.shift

class GELU(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self, x):
    return 0.5 * x * (1 + torch.tanh(
      torch.sqrt(torch.tensor(2.0 / torch.pi)) *
      (x + 0.044715 * torch.pow(x, 3))
    ))

class MultiHeadAttention(nn.Module):
  def __init__(self, d_in, d_out,
    context_length, dropout, num_heads, qkv_bias=False):
    super().__init__()
    assert d_out % num_heads == 0, "d_out must be divisible by num_heads"
    self.d_out = d_out
    self.num_heads = num_heads
    self.head_dim = d_out // num_heads #A
    self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
    self.out_proj = nn.Linear(d_out, d_out) #B
    self.dropout = nn.Dropout(dropout)
    self.register_buffer(
    'mask',
    torch.triu(torch.ones(context_length, context_length), diagonal=1)
    )

  def forward(self, x):
    b, num_tokens, d_in = x.shape
    keys = self.W_key(x) #C
    queries = self.W_query(x) #C
    values = self.W_value(x) #C
    keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) #D
    values = values.view(b, num_tokens, self.num_heads, self.head_dim) #D
    queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)#D
    keys = keys.transpose(1, 2) #E
    queries = queries.transpose(1, 2) #E
    values = values.transpose(1, 2) #E
    attn_scores = queries @ keys.transpose(2, 3) #F
    mask_bool = self.mask.bool()[:num_tokens, :num_tokens] #G
    attn_scores.masked_fill_(mask_bool, -torch.inf) #H
    attn_weights = torch.softmax(
    attn_scores / keys.shape[-1]**0.5, dim=-1)
    attn_weights = self.dropout(attn_weights)
    context_vec = (attn_weights @ values).transpose(1, 2) #I
    #J
    context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
    context_vec = self.out_proj(context_vec) #K
    return context_vec

class TransformerBlock(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.att = MultiHeadAttention(
      d_in=cfg["emb_dim"],
      d_out=cfg["emb_dim"],
      context_length=cfg["context_length"],
      num_heads=cfg["n_heads"],
      dropout=cfg["drop_rate"],
      qkv_bias=cfg["qkv_bias"])
    self.ff = FeedForward(cfg)
    self.norm1 = LayerNorm(cfg["emb_dim"])
    self.norm2 = LayerNorm(cfg["emb_dim"])
    self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

  def forward(self, x): #A
    shortcut = x
    x = self.norm1(x)
    x = self.att(x)
    x = self.drop_shortcut(x)
    x = x + shortcut # Add the original input back
    shortcut = x #B
    x = self.norm2(x)
    x = self.ff(x)
    x = self.drop_shortcut(x)
    x = x + shortcut #C
    return x

class FeedForward(nn.Module):
  def __init__(self, cfg):
    super().__init__()
    self.layers = nn.Sequential(
      nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
      GELU(),
      nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
    )

  def forward(self, x):
    return self.layers(x)

class GPTModel(nn.Module):
  def __init__(self, cfg):
    super().__init__()

    # 定义词嵌入层（token embedding）
    self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
    # 定义位置嵌入层（positional embedding）：
    self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
    # 定义嵌入层 dropout，在嵌入层后添加 dropout，防止模型过度依赖某些嵌入向量，减少过拟合。
    self.drop_emb = nn.Dropout(cfg["drop_rate"])

    self.trf_blocks = nn.Sequential(
        *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )

    self.final_norm = LayerNorm(cfg["emb_dim"])

    self.out_head = nn.Linear(
      cfg["emb_dim"],
      cfg["vocab_size"],
      bias=False
    )

  def forward(self, in_idx):
    batch_size, seq_len = in_idx.shape
    tok_embeds = self.tok_emb(in_idx)
    #A
    pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))

    x = tok_embeds + pos_embeds
    x = self.drop_emb(x)
    x = self.trf_blocks(x)
    x = self.final_norm(x)
    logits = self.out_head(x)
    return logits

torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)

# 切换到评估模式
model.eval()


def generate_text_simple(model, idx, max_new_tokens, context_size): #A

  for _ in range(max_new_tokens):
    # 这里是一个自循环机制，把上一轮生成的结果加到最开始的输出里面，作为下一轮的输入
    idx_cond = idx[:, -context_size:] #B
    with torch.no_grad():
      logits = model(idx_cond)

    # 此处 logits 的形状是 [batch_size, seq_len, vocab_size]
    # 保留最后一个 token
    logits = logits[:, -1, :] #C

    # 这里的 dim 是指 logits 的最后一个维度，logits 的最后一个维度是词汇表
    probas = torch.softmax(logits, dim=-1) #D
    # probas 形状仍为 [batch_size, vocab_size]
    # keepdim=True：保持维度不变，将输出形状从 [batch_size] 变为 [batch_size, 1]（比如 [1,1]），方便后续和原始 idx 拼接
    idx_next = torch.argmax(probas, dim=-1, keepdim=True) #E

    # 合并
    idx = torch.cat((idx, idx_next), dim=1) #F

  return idx

def text_to_token_ids(text, tokenizer):
  encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
  encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension
  return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
  flat = token_ids.squeeze(0) # remove batch dimension
  return tokenizer.decode(flat.tolist())

start_context = "Every effort moves you"
tokenizer = tiktoken.get_encoding("gpt2")

# 调用模型来生成文本
token_ids = generate_text_simple(
  model=model,
  idx=text_to_token_ids(start_context, tokenizer),
  max_new_tokens=10,
  context_size=GPT_CONFIG_124M["context_length"]
)
print("Output text:\n", token_ids_to_text(token_ids, tokenizer))


inputs = torch.tensor([[16833, 3626, 6100], # ["every effort moves",
[40, 1107, 588]]) # "I really like"]

targets = torch.tensor([[3626, 6100, 345 ], # [" effort moves you",
[107, 588, 11311]]) # " really like chocolate"]

with torch.no_grad(): #A
  logits = model(inputs)
probas = torch.softmax(logits, dim=-1) # Probability of each token in vocabulary
# print(probas.shape)

token_ids = torch.argmax(probas, dim=-1, keepdim=True)
# print("Token IDs:\n", token_ids)

# print(f"Targets batch 1: {token_ids_to_text(targets[0], tokenizer)}")
# print(f"Outputs batch 1: {token_ids_to_text(token_ids[0].flatten(), tokenizer)}")

text_idx = 0
target_probas_1 = probas[text_idx, [0, 1, 2], targets[text_idx]]
# print("Text 1:", target_probas_1)
text_idx = 1
target_probas_2 = probas[text_idx, [0, 1, 2], targets[text_idx]]
# print("Text 2:", target_probas_2)

log_probas = torch.log(torch.cat((target_probas_1, target_probas_2)))
print(log_probas)

avg_log_probas = torch.mean(log_probas)
print(avg_log_probas)

logits_flat = logits.flatten(0, 1)
targets_flat = targets.flatten()
print("Flattened logits:", logits_flat.shape)
print("Flattened targets:", targets_flat.shape)

loss = torch.nn.functional.cross_entropy(logits_flat, targets_flat)
print(loss)

Output text:
 Every effort moves you rentingetic wasnم refres RexMeCHicular stren
tensor([ -9.5042, -10.3796, -11.3677, -10.1492,  -9.7764, -12.2561])
tensor(-10.5722)
Flattened logits: torch.Size([6, 50257])
Flattened targets: torch.Size([6])
tensor(10.5722)


In [None]:
GPT_CONFIG_124M = {
"vocab_size": 50257, # Vocabulary size
"context_length": 1024, # Context length
"emb_dim": 768, # Embedding dimension
"n_heads": 12, # Number of attention heads
"n_layers": 12, # Number of layers
"drop_rate": 0.1, # Dropout rate
"qkv_bias": False # Query-Key-Value bias
}
GPT_CONFIG_124M

{'vocab_size': 50257,
 'context_length': 1024,
 'emb_dim': 768,
 'n_heads': 12,
 'n_layers': 12,
 'drop_rate': 0.1,
 'qkv_bias': False}

In [None]:
print(as)

SyntaxError: invalid syntax (ipython-input-3744142341.py, line 1)

# 预训练模型


In [None]:
import tiktoken
import torch

tokenizer = tiktoken.get_encoding("gpt2")

batch = []
txt1 = "Every effort moves you"
txt2 = "Every day holds a"
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)

print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


# 预训练模型
