# Recitation 6: Project 2 preview
_Date_: 2025-10-23

## Quick introduction

- **Task**: Recover text eliminated from a preprocessing pipeline
- **Model**: Transformer (Encoder-Decoder)
- **Evaluation**: BLEU score

## Transformer

<figure>
<img src="../assets/figures/transformer.png" width="400" align = "center"/>
<figcaption align = "center"> Source: https://arxiv.org/pdf/1706.03762 </figcaption>
</figure>

### Components
- Positional encoding
- Embedding
- Multihead attention
  - Self attention
  - Cross attention
- Feedforward network
- Residual connection & normalization
- Masks
  - key padding mask
  - causal mask

### Positional Encoding
$$
\begin{align}
\text{PE}(pos, 2i) &=  \sin \Big( \frac{pos}{10000^{2i / d_{m}}} \Big) \\
\text{PE}(pos, 2i+1) &=  \cos \Big( \frac{pos}{10000^{2i / d_{m}}} \Big) \\
\end{align}
$$

where $pos$ is the position number in the sequence, and $i$ is the $i^{th}$ dimension

In [None]:
from typing import Optional, Tuple
import math
import torch
import torch.nn as nn

torch.manual_seed(42)

In [None]:
class PosEncoding(nn.Module):
    """Positional encoding module"""
    def __init__(self, d_model: int, seq_len: int):
        """
        Assume a single data instance is a list of tokens

        Args:
            d_model (int): the size of the input embeddings
            seq_len (int): the length of input sequence

        Returns: None
        """
        super().__init__()
        assert d_model % 2 == 0, "The embedding size must be divisible by 2"

        pe = torch.zeros(seq_len, d_model)

        pos = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000) / d_model)
        )

        pe[:, 0::2] = torch.sin(pos * div_term)  # even-numbered position
        pe[:, 1::2] = torch.cos(pos * div_term)  # odd-numbered position

        pe = pe.unsqueeze(0)  # dim: (1, seq_len, d_model)
        self.register_buffer("pe", pe, persistent=False)  # PE will not be in state_dict and unlearnable

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward function
        Args:
            x (torch.Tensor): word embeddings for input sequence with the shape (B, seq_len, d_model)

        Returns: new embeddings that add positional embeddings
        """
        seq_len = x.size(1)
        x_new = x + self.pe[:, :seq_len, :]
        return x_new

In [None]:
batch_size, seq_len, embed_size = 1, 3, 8
x = torch.rand(batch_size, seq_len, embed_size)
x

In [None]:
pos_emb = PosEncoding(embed_size, seq_len)
res = pos_emb(x)
print(f"{res}\nShape{res.shape}")

## Multihead Attention
<table><tr>
    <td> <img src="../assets/figures/dot-prod-attn.png" style="width: 300px;"/> </td>
    <td> <img src="../assets/figures/mha.png" style="width: 300px;"/> </td>
</tr></table>

Mathematically for scaled dot-product attention, $$Attention(Q, K, V) = \text{Softmax} \big( \frac{QK^T}{\sqrt{d_k}} \big)V$$

### Prerequisites

- Reshape tensor
  - [`torch.Tensor.view(*shape)`](https://docs.pytorch.org/docs/stable/generated/torch.Tensor.view.html): Reshape a (continguous) tensor with the same data without making a new copy
- Masking
  - [`torch.Tensor.masked_fill(mask, value)`](https://docs.pytorch.org/docs/stable/generated/torch.Tensor.masked_fill.html): Fill in the provided mask with a certain value to the tensor



### Exercise

In [None]:
def make_padding_mask(seq: torch.Tensor, pad_idx=0):
    """
    seq: (batch, seq_len)
    Returns a mask (batch, seq_len) of booleans: True for non-pad tokens, False for pad.
    """
    return seq != pad_idx


def scale_dotprod_attn(
    Q: torch.Tensor, 
    K: torch.Tensor, 
    V: torch.Tensor,
    mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
    """Scaled dot-product attention

    Args:
        Q: (target) input affine-transformed by W_Q, with shape (..., seq_len, d_Q)
        K: (source) input affine-transformed by W_K, with shape (..., seq_len, d_K)
        V: (source) input affine-transformed by W_V, with shape (..., seq_len, d_V)

    Returns: attention outputs
    """
    # 1. Compute matrix multiplication between Q and K followed by a scaling factor
    scale = K.size(-1) ** -0.5
    score = (Q @ K.transpose(-2, -1)) * scale

    # 2. Optional masking
    if mask is not None:
        assert mask.dim() == score.dim(), 'Mask has different dimension as the scaled score between Q, K'
        score = score.masked_fill(mask==0, -torch.inf)

    # 3. Softmax and multiplies to V
    attn = nn.functional.softmax(score, dim=-1)
    out = attn @ V

    return out

In [None]:
res = scale_dotprod_attn(x, x, x)
print(f"{res}\nShape: {res.shape}")

In [None]:
class MHA(nn.Module):
    """Multihead attention"""
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % num_heads == 0
        self.num_heads = num_heads
        self.d_head = d_model // num_heads
        self.scale = self.d_head**-0.5

        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(
        self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask=None
    ) -> Tuple[torch.Tensor, ...]:
        """
        Args:
            q: 'Query' matrix
            k: 'Key' matrix
            v: 'Value' matrix
            mask: (optional) attention mask

        Returns: a tuple of attention scores and attention output

        """
        B, Tq, _ = q.size()
        _, Tk, _ = k.size()

        Q = self.w_q(q).view(B, Tq, self.num_heads, self.d_head).transpose(1, 2)
        K = self.w_k(k).view(B, Tk, self.num_heads, self.d_head).transpose(1, 2)
        V = self.w_v(v).view(B, Tk, self.num_heads, self.d_head).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) * self.scale  # (B, heads, Tq, Tk)

        if mask is not None:
            assert mask.dim() == scores.dim(), (
                f"Provided mask has different dimension ({mask.dim()}) as QK scores ({scores.dim()})"
            )
            scores = scores.masked_fill(mask == 0, -torch.inf)

        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)
        out = torch.matmul(attn, V)  # (B, heads, Tq, d_head)
        out = out.transpose(1, 2).contiguous().view(B, Tq, self.num_heads * self.d_head)
        out = self.w_o(out)
        return out, attn

In [None]:
mha = MHA(embed_size, 2)
o, _ = mha(x, x, x)
print(f"{o}\nShape: {o.shape}")

In [None]:
class FFN(nn.Module):
    """Feedforward neural network"""
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.lin1 = nn.Linear(d_model, d_ff)
        self.lin2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x2 = F.relu(self.lin1(x))
        x2 = self.dropout(x2)
        x2 = self.lin2(x2)
        return x2

In [None]:
class EncoderLayer(nn.Module):
    """Single layer of Transformer encoder"""

    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MHA(d_model, num_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = FFN(d_model, d_ff, dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out, attn = self.self_attn(x, x, x, mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)
        ff_out = self.ff(x)
        x = x + self.dropout(ff_out)
        x = self.norm2(x)
        return x, attn

In [None]:
class Encoder(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        d_model: int,
        num_heads: int,
        d_ff: int,
        num_layers: int,
        max_len: int,
        pad_idx: int,
        dropout=0.1,
    ):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model, padding_idx=pad_idx)
        self.pos_emb = PosEncoding(d_model, max_len)
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.norm = nn.LayerNorm(d_model)
        self.pad_idx = pad_idx

    def forward(self, input_ids: torch.Tensor):
        B, T = input_ids.size()
        
        x = self.pos_emb(self.token_emb(input_ids))
        mask = make_padding_mask(input_ids, pad_idx=self.pad_idx)  # (B, T)

        # Expand mask for multi-head attention: (B, T) -> (B, 1, 1, T)
        mask_exp = mask.unsqueeze(1).unsqueeze(2)

        attns = []
        for layer in self.layers:
            x, a = layer(x, mask_exp)
            attns.append(a)
        x = self.norm(x)

        return x, mask, attns