# Lecture 7: Attention Mechanisms

In this lecture, we will introduce attention mechanisms. Attention is the core module in the transformer model, which is the state-of-the-art model for many NLP tasks. Let's reproduce the attention mechanism from scratch step by step.

## Importing libraries

In [1]:
import os
import matplotlib.pyplot as plt
import itertools
from dataclasses import dataclass
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
from src.utils import load_text, set_seed, configure_device

## Configuration

In [2]:
@dataclass
class GPTConfig:
    root_dir: str = os.getcwd() + "/../../"
    dataset_path: str = "data/names.txt"
    device: torch.device = torch.device('cpu')  # Automatic device configuration

    # Tokenizer
    vocab_size: int = 0  # Set later

    # Model
    context_size: int = 12
    n_layer: int = 3
    n_head: int = 4
    d_embed: int = 64
    d_ff: int = 256
    dropout: float = 0.1

    # Training
    val_size: float = 0.1
    batch_size: int = 32
    max_steps: int = 1000
    lr: float = 0.01
    val_interval: int = 100

    seed: int = 101

config = GPTConfig()

## Reproducibility

In [3]:
set_seed(config.seed)

Random seed set to 101


## Device

In [4]:
config.device = configure_device()

Running on mps


## Dataset

In [5]:
names = load_text(config.root_dir + config.dataset_path).splitlines()

Loaded text data from /Users/pathfinder/Documents/GitHub/LLM101/notebooks/Lectures/../../data/names.txt (length: 228145 characters).


## Tokenizer

In [6]:
class Tokenizer:
    def __init__(self, names):
        self.names = names
        self.vocab = list(set("".join(self.names)))
        self.vocab.insert(0, ".")
        self.vocab_size = len(self.vocab)
        self.char2idx = {char: idx for idx, char in enumerate(self.vocab)}
        self.idx2char = {idx: char for idx, char in enumerate(self.vocab)}

    def encode(self, name):
        return [self.char2idx[char] for char in name]

    def decode(self, tokens):
        return "".join([self.idx2char[token] for token in tokens])

tokenizer = Tokenizer(names)
config.vocab_size = tokenizer.vocab_size

## Preprocessing

In [7]:
train_names, val_names = train_test_split(names, test_size=config.val_size, random_state=config.seed)

In [None]:
class NamesDataset(Dataset):
    def __init__(self, _names, context_size):
        self.inputs, self.targets = [], []

        for name in _names:
            context = [0] * context_size

            for char in name + ".":
                idx = str2idx[char]
                self.inputs.append(context)
                self.targets.append(idx)
                context = context[1:] + [idx]

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_ids = torch.tensor(self.inputs[idx])
        target_id = torch.tensor(self.targets[idx])
        return input_ids, target_id

train_dataset = NamesDataset(train_names, MLPConfig.context_size)
val_dataset = NamesDataset(val_names, MLPConfig.context_size)
train_loader = DataLoader(train_dataset, batch_size=MLPConfig.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=MLPConfig.batch_size, shuffle=False)

In [8]:
class NamesDataset(Dataset):
    def __init__(self, _names, context_size):
        self.inputs, self.targets = [], []

        for name in _names:
            tokens = [tokenizer.encode(char) for char in name + "."]  # Convert name to token indices

            # Create context windows
            for i in range(len(tokens) - context_size):  # Ensure enough tokens for full sequence
                context = tokens[i:i + context_size]  # Input sequence
                target = tokens[i + 1:i + context_size + 1]  # Shifted target sequence
                self.inputs.append(context)
                self.targets.append(target)

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_ids = torch.tensor(self.inputs[idx])  # Shape: (context_size,)
        target_ids = torch.tensor(self.targets[idx])  # Shape: (context_size,)
        return input_ids, target_ids

train_dataset = NamesDataset(train_names, config.context_size)
val_dataset = NamesDataset(val_names, config.context_size)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)

In [9]:
for inputs, targets in train_loader:
    print(inputs.shape, targets.shape)
    break

torch.Size([32, 12, 1]) torch.Size([32, 12, 1])


In [10]:
adf

NameError: name 'adf' is not defined

## Model

### Casual Self-Attention

In [9]:
class CasualSelfAttention(nn.Module):
    def __init__(self, d_embed: int, n_head: int, dropout: float):
        super(CasualSelfAttention, self).__init__()
        self.n_head = n_head
        self.d_head = d_embed // n_head
        self.scale = self.d_head ** -0.5
        self.dropout = dropout

        self.query = nn.Linear(d_embed, d_embed, bias=False)
        self.key = nn.Linear(d_embed, d_embed, bias=False)
        self.value = nn.Linear(d_embed, d_embed, bias=False)
        self.out = nn.Linear(d_embed, d_embed, bias=False)

    def forward(self, x):
        batch_size, context_size, _ = x.size()

        # Query, Key, Value
        q = self.query(x)  # (batch_size, context_size, d_embed)
        k = self.key(x)  # (batch_size, context_size, d_embed)
        v = self.value(x)  # (batch_size, context_size, d_embed)
        q = q.view(batch_size, context_size, self.n_head, self.d_head).transpose(1, 2)  # (batch_size, n_head, context_size, d_head)
        k = k.view(batch_size, context_size, self.n_head, self.d_head).transpose(1, 2)  # (batch_size, n_head, context_size, d_head)
        v = v.view(batch_size, context_size, self.n_head, self.d_head).transpose(1, 2)  # (batch_size, n_head, context_size, d_head)

        # Scaled Dot-Product Attention
        attn_scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale  # (batch_size, n_head, context_size, context_size)

        # Masking
        mask = torch.triu(torch.ones(context_size, context_size, device=x.device), diagonal=1).bool()  # (context_size, context_size)
        attn_scores = attn_scores.masked_fill(mask[None, None, :, :], float('-inf'))  # (batch_size, n_head, context_size, context_size)

        # Softmax
        attn_scores = F.softmax(attn_scores, dim=-1)  # (batch_size, n_head, context_size, context_size)

        # Dropout
        attn_scores = F.dropout(attn_scores, p=self.dropout, training=self.training)  # (batch_size, n_head, context_size, context_size)

        # Weighted Sum
        attn_output = torch.matmul(attn_scores, v)  # (batch_size, n_head, context_size, d_head)

        # Concatenation
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, context_size, self.n_head * self.d_head)  # (batch_size, context_size, d_embed)

        # Output Linear Layer
        x = self.out(attn_output)  # (batch_size, context_size, d_embed)
        return x


### Feed Forward

In [10]:
class FeedForward(nn.Module):
    def __init__(self, d_embed: int, d_ff: int, dropout: float):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(d_embed, d_ff)
        self.fc2 = nn.Linear(d_ff, d_embed)
        self.activation = nn.GELU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        x = self.dropout(x)
        return x

### Decoder Layer

In [11]:
class DecoderLayer(nn.Module):
    def __init__(self, d_embed: int, n_head: int, d_ff: int, dropout: float):
        super(DecoderLayer, self).__init__()
        self.self_attention = CasualSelfAttention(d_embed, n_head, dropout)
        self.layer_norm1 = nn.LayerNorm(d_embed)
        self.feed_forward = FeedForward(d_embed, d_ff, dropout)
        self.layer_norm2 = nn.LayerNorm(d_embed)

    def forward(self, x):
        x = x + self.self_attention(self.layer_norm1(x))
        x = x + self.feed_forward(self.layer_norm2(x))
        return x

### GPT

In [12]:
class GPT(nn.Module):
    def __init__(self, vocab_size: int, context_size: int, d_embed: int, n_head: int, d_ff: int, n_layer: int, dropout: float):
        super(GPT, self).__init__()
        self.context_size = context_size
        self.token_embedding = nn.Embedding(vocab_size, d_embed)
        self.positional_embedding = nn.Embedding(context_size, d_embed)
        self.layers = nn.ModuleList([DecoderLayer(d_embed, n_head, d_ff, dropout) for _ in range(n_layer)])
        self.layer_norm = nn.LayerNorm(d_embed)
        self.linear = nn.Linear(d_embed, vocab_size)

    def forward(self, x):  # x: (batch_size, context_size)
        batch_size, context_size = x.size()
        assert context_size <= self.context_size, \
            f"context_size should be less than or equal to {self.config.context_size}, but got {context_size}"

        # Embedding
        token_embed = self.token_embedding(x)  # (batch_size, context_size, d_embed)
        pos_idx = torch.arange(context_size, device=x.device)  # (context_size)
        pos_embed = self.positional_embedding(pos_idx)  # (batch_size, context_size, d_embed)
        x = token_embed + pos_embed  # (batch_size, context_size, d_embed)

        # Decoder layers
        for layer in self.layers:
            x = layer(x)  # (batch_size, context_size, d_embed)

        # Output
        x = self.layer_norm(x)
        x = self.linear(x)  # (batch_size, context_size, vocab_size)
        return x

    def loss (self, logits, targets):
        logits = logits.view(-1, logits.size(-1))  # (batch_size * context_size, vocab_size)
        targets = targets.view(-1)  # (batch_size * context_size)
        return F.cross_entropy(logits, targets)

    @torch.no_grad()
    def generate(self, tokenizer, device):
        self.eval()
        end_id = tokenizer.encode(".")
        context = [end_id] * self.context_size
        context = torch.tensor(context, dtype=torch.long, device=device).unsqueeze(0)  # Shape: (1, context_size)

        # Generation loop
        while True:
            # Truncate
            context = context[:, -self.config.context_size:]  # (batch_size=1, context_size)

            # Forward
            logits = self.forward(context)[:, -1, :] # (batch_size=1, vocab_size)

            # Sample
            probs = F.softmax(logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)  # (batch_size=1, 1)

            # Concatenate
            context = torch.cat((context, next_token), dim=-1)  # (batch_size=1, context_size + 1)

            # Decode
            name_char = tokenizer.decode([next_token[0].item()])
            print(name_char, end='', flush=True)

            # Break if "."
            if next_token[0].item() == end_id:
                break


In [13]:
gpt = GPT(
    vocab_size=config.vocab_size,
    context_size=config.context_size,
    d_embed=config.d_embed,
    n_head=config.n_head,
    d_ff=config.d_ff,
    n_layer=config.n_layer,
    dropout=config.dropout
).to(config.device)
print(gpt)
print(f"Number of parameters: {sum(p.numel() for p in gpt.parameters() if p.requires_grad)}")

GPT(
  (token_embedding): Embedding(27, 64)
  (positional_embedding): Embedding(12, 64)
  (layers): ModuleList(
    (0-2): 3 x DecoderLayer(
      (self_attention): CasualSelfAttention(
        (query): Linear(in_features=64, out_features=64, bias=False)
        (key): Linear(in_features=64, out_features=64, bias=False)
        (value): Linear(in_features=64, out_features=64, bias=False)
        (out): Linear(in_features=64, out_features=64, bias=False)
      )
      (layer_norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (feed_forward): FeedForward(
        (fc1): Linear(in_features=64, out_features=256, bias=True)
        (fc2): Linear(in_features=256, out_features=64, bias=True)
        (activation): GELU(approximate='none')
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (layer_norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
    )
  )
  (layer_norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  (linear): Linear(in_features=64,

### Training

In [14]:
def train(
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        max_steps: int,
        lr: float,
        val_interval: int,
        device: torch.device
):
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    steps = []
    train_losses = []
    val_losses = []
    train_iter = itertools.cycle(train_loader)  # Infinite dataloader

    for step in range(1, max_steps + 1):
        model.train()
        train_inputs, train_targets = next(train_iter)
        train_inputs, train_targets = train_inputs.to(device), train_targets.to(device)
        optimizer.zero_grad()
        logits = model(train_inputs)
        loss = model.loss(logits, train_targets)
        loss.backward()
        optimizer.step()
        steps.append(step)
        train_losses.append(loss.item())

        if step % val_interval == 0:
            model.eval()
            val_loss = 0.0
            total_samples = 0
            with torch.no_grad():
                for val_inputs, val_targets in val_loader:
                    val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                    val_logits = model(val_inputs)
                    batch_loss = model.loss(val_logits, val_targets)
                    val_loss += batch_loss.item() * val_inputs.size(0)
                    total_samples += val_inputs.size(0)
            val_loss /= total_samples
            val_losses.append(val_loss)
            print(f"Step {step}: Train Loss = {loss.item():.4f}, Val Loss = {val_loss:.4f}")

        if step == 1:
            print(f"Initial Train Loss = {loss.item():.4f}")

    # Plot the loss
    plt.figure()
    plt.plot(steps, train_losses, label="Train")
    val_steps = [step for step in steps if step % val_interval == 0]
    plt.plot(val_steps, val_losses, label="Validation")
    plt.xlabel("Steps")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

In [15]:
train(
    model=gpt,
    train_loader=train_loader,
    val_loader=val_loader,
    max_steps=config.max_steps,
    lr=config.lr,
    val_interval=config.val_interval,
    device=config.device
)

ValueError: too many values to unpack (expected 2)

### Inference

In [17]:
for _ in range(5):
    gpt.generate(tokenizer, config.device)
    print()

.
.
.
.
kw.
