In [None]:
import torch
import torch.nn as nn
from architectures.scaled_dot_product import scaled_dot_product_attention
from architectures.TransformerLayer import TransformerLayer
from architectures.MHA import MultiheadAttention

# Tessting Transformer

In this section, I'll test the Transformer step by step.

Starting with scaled dot-product attention and multi-head attention, I'll then progress to Transformer layers and ultimately the Transformer encoder-decoder model.

### Scaled Dot-Product Attention

In this section, I'll implement Scaled Dot-Product Attention. The input types and shapes are:

- `q`: `Tensor[n, tgt_len, d_head]`
- `k`: `Tensor[n, src_len, d_head]`
- `v`: `Tensor[n, src_len, d_head]`
- `key_padding_mask`: `Tensor[n, src_len]`
- `causal`: `bool`

`n` represents the total number of attention operations calculated in parallel.

In multi-head attention, it's usually the product of the batch size and the number of attention heads. For each of the `n` operations, we will compute attention scores

$$s_{i,j} = \mathbf{q}_i^T \mathbf{k}_j / \sqrt{d_\text{head}}$$

Then we Apply softmax to the attention score, then use it as weights to linearly combine values in `v`:

$$a_{i,j} = \dfrac{\exp(s_{i,j})}{\sum_k \exp(s_{i, k})}$$

$$\mathrm{Attention}(\mathbf{Q}, \mathbf{K}, \mathbf{V})_i = \sum_j a_{i,j} \mathbf{v}_j$$

We consider two essential details: *padding* and *causal masking*.

- *padding*: The Transformer input usually contains sentences of varying lengths, padded into a tensor. Attention should ignore pad tokens, so they don't impact the results. `key_padding_mask` is a byte tensor set to **1** in pad token positions. If `key_padding_mask` is `None`, there's no padding in the input.

- *causal masking*: Autoregressive generation in the decoder uses causal attention masks, meaning position $i$ can only attend to position $j$ if $i \ge j$. If `causal` is set to true, apply causal attention masking. The provided `future_mask` may be useful.

In [1]:
future_mask = torch.triu(torch.zeros([1024, 1024]).fill_(float("-inf")), 1)

NameError: name 'torch' is not defined

#### Testing Implementation of scaled dot product

In [2]:
def test_scaled_dot_product_attention():
    q1 = torch.tensor([[1, 0, 0], [0.5, 0.5, 0]]).view(1, 2, 3).float()
    k1 = torch.tensor([[1, 0, 0], [0, 1, 0], [0, 0, 1]]).view(1, 3, 3).float()
    v1 = torch.tensor([[3, 0, 0], [0, 5, 0], [0, 0, 7]]).view(1, 3, 3).float()
    o1 = scaled_dot_product_attention(q1, k1, v1)
    assert list(o1.shape) == [1, 2, 3]
    assert torch.allclose(
        o1.view(-1)[:5],
        torch.tensor([1.413249135017395, 1.3222922086715698, 1.8512091636657715, 1.0912044048309326, 1.818674087524414]).float(),
        rtol=1e-3
    )

    torch.manual_seed(100)
    q2 = torch.randn(3, 5, 7).float()
    k2 = torch.randn(3, 11, 7).float()
    v2 = torch.randn(3, 11, 7).float()
    o2 = scaled_dot_product_attention(q2, k2, v2)
    assert list(o2.shape) == [3, 5, 7]
    assert torch.allclose(
        o2.view(-1)[6: 11],
        torch.tensor([-0.40304261445999146, -0.2931785583496094, 0.20563912391662598, 0.08719107508659363, 0.08274038136005402]).float(),
        rtol=1e-3
    )

    key_padding_mask = torch.tensor([[0, 0, 1]]).byte()
    o4 = scaled_dot_product_attention(q1, k1, v1, key_padding_mask=key_padding_mask)
    assert list(o4.shape) == [1, 2, 3]
    assert torch.allclose(
        o4.view(-1)[:5],
        torch.tensor([1.921372413635254, 1.7977124452590942, 0.0, 1.5, 2.5]),
        rtol=1e-3
    )

    torch.manual_seed(210)
    q5 = torch.randn(2, 4, 3).float()
    k5 = torch.randn(2, 4, 3).float()
    v5 = torch.randn(2, 4, 3).float()
    o5 = scaled_dot_product_attention(q5, k5, v5, causal=True)
    assert list(o5.shape) == [2, 4, 3]
    assert torch.allclose(
        o5.view(-1)[2: 7],
        torch.tensor([0.9079901576042175, -0.573272705078125, -1.1765587329864502, 0.7771514058113098, -0.3235766291618347]),
        rtol=1e-3
    )


test_scaled_dot_product_attention()

NameError: name 'torch' is not defined

### Multi-head Attention

In this section, I'll implement multi-head attention.

The input to this layer has types and shapes:

- `q`: `Tensor[bsz, tgt_len, d_model]`
- `k`: `Tensor[bsz, src_len, d_model]`
- `v`: `Tensor[bsz, src_len, d_model]`
- `key_padding_mask`: `Tensor[bsz, src_len]`
- `causal`: `bool`

A multi-head attention layer has four linear projection layers (including biases): `q_proj`, `k_proj`, `v_proj`, and `o_proj`. `q_proj`, `k_proj`, and `v_proj` project `q`, `k`, and `v` respectively into `n_heads` `d_head` vectors. The shapes of the projected query, key, and value will be `[bsz, tgt_len, n_heads, d_head]`, `[bsz, src_len, n_heads, d_head]`, and `[bsz, src_len, n_heads, d_head]` respectively.

In the provided code below, instead of creating `n_heads` projection matrices of `d_model -> d_head` for the query/key/value, we use a single projection matrix of `d_model -> d_model`. This means the first `d_head` channels correspond to the first attention head, and channels from `d_head + 1` to `2 * d_head` correspond to the second attention head, and so on. The same rule applies to the input channels of `o_proj`.

Next, we  rearrange the projected query, key, and value tensors appropriately and feed them into the previously implemented `scaled_dot_product_attention` function.

The output of `scaled_dot_product_attention` should then be projected by o_proj to produce the final output. The output shape should be `[bsz, tgt_len, d_model]`.

#### Testing Implementation

In [None]:
def test_multihead_attention():
    torch.manual_seed(350)
    mha0 = nn.MultiheadAttention(embed_dim=128, num_heads=4, batch_first=True)
    nn.init.normal_(mha0.in_proj_weight, mean=0.0, std=0.05)
    nn.init.normal_(mha0.in_proj_bias, mean=0.0, std=0.05)
    nn.init.normal_(mha0.out_proj.weight, mean=0.0, std=0.05)
    nn.init.normal_(mha0.out_proj.bias, mean=0.0, std=0.05)
    mha1 = MultiheadAttention(128, 4)
    mha1.q_proj.weight.data.copy_(mha0.in_proj_weight.data[:128, :])
    mha1.q_proj.bias.data.copy_(mha0.in_proj_bias.data[:128])
    mha1.k_proj.weight.data.copy_(mha0.in_proj_weight.data[128:256, :])
    mha1.k_proj.bias.data.copy_(mha0.in_proj_bias.data[128:256])
    mha1.v_proj.weight.data.copy_(mha0.in_proj_weight.data[256:, :])
    mha1.v_proj.bias.data.copy_(mha0.in_proj_bias.data[256:])
    mha1.o_proj.weight.data.copy_(mha0.out_proj.weight.data)
    mha1.o_proj.bias.data.copy_(mha0.out_proj.bias.data)

    torch.manual_seed(400)
    q1 = torch.randn(4, 6, 128).float()
    k1 = torch.randn(4, 6, 128).float()
    v1 = torch.randn(4, 6, 128).float()
    assert torch.allclose(
        mha0(q1, k1, v1)[0].contiguous(),
        mha1(q1, k1, v1).contiguous(),
        rtol=1e-3
    )

    torch.manual_seed(600)
    q3 = torch.randn(4, 6, 128).float()
    k3 = torch.randn(4, 6, 128).float()
    v3 = torch.randn(4, 6, 128).float()
    key_padding_mask = torch.tensor([
        [0, 0, 1, 1, 1, 1],
        [0, 0, 0, 0, 1, 1],
        [0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 1]
    ]).byte()

    o30 = mha0(
        q3, k3, v3,
        key_padding_mask=key_padding_mask.to(torch.bool),
        attn_mask=future_mask[:6, :6]
    )[0].contiguous()

    o31 = mha1(q3, k3, v3, key_padding_mask=key_padding_mask, causal=True).contiguous()

    assert torch.allclose(o30[0, :2], o31[0, :2], rtol=1e-3)
    assert torch.allclose(o30[0, :4], o31[0, :4], rtol=1e-3)
    assert torch.allclose(o30[0, :6], o31[0, :6], rtol=1e-3)
    assert torch.allclose(o30[0, :5], o31[0, :5], rtol=1e-3)

test_multihead_attention()

### Transformer Layers

In this section, I'll **implement Transformer Encoder/Decoder layers** according to Figure 1 of "Attention is All You Need".

I also apply residual dropout (Section 5.4 of the paper) and attention dropout.

The `is_decoder` flag determines if this Transformer layer is an encoder or a decoder.

If it's an encoder, the input types and shapes will be:

- `x`: `Tensor[bsz, src_len, d_model]`
- `padding_mask`: `Tensor[bsz, src_len]`

If it's a decoder, the input types and shapes will be:

- `x`: `Tensor[bsz, tgt_len, d_model]`
- `padding_mask`: `Tensor[bsz, tgt_len]`
- `encoder_out`: `Tensor[bsz, src_len, d_model]`
- `encoder_padding_mask`: `Tensor[bsz, src_len]`

The output is a tensor of the same shape as `x`.

#### Testing Implementation

In [3]:
def test_transformer_layer():
    torch.manual_seed(750)
    enc_layer0 = nn.TransformerEncoderLayer(128, 4, dim_feedforward=512, dropout=0.0, batch_first=True)
    nn.init.normal_(enc_layer0.self_attn.in_proj_weight, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.self_attn.in_proj_bias, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.self_attn.out_proj.weight, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.self_attn.out_proj.bias, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.linear1.weight, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.linear1.bias, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.linear2.weight, mean=0.0, std=0.05)
    nn.init.normal_(enc_layer0.linear2.bias, mean=0.0, std=0.05)
    enc_layer1 = TransformerLayer(False, 128, 4, 512, 0.0)
    enc_layer1.self_attn.q_proj.weight.data.copy_(enc_layer0.self_attn.in_proj_weight.data[:128, :])
    enc_layer1.self_attn.q_proj.bias.data.copy_(enc_layer0.self_attn.in_proj_bias.data[:128])
    enc_layer1.self_attn.k_proj.weight.data.copy_(enc_layer0.self_attn.in_proj_weight.data[128:256, :])
    enc_layer1.self_attn.k_proj.bias.data.copy_(enc_layer0.self_attn.in_proj_bias.data[128:256])
    enc_layer1.self_attn.v_proj.weight.data.copy_(enc_layer0.self_attn.in_proj_weight.data[256:, :])
    enc_layer1.self_attn.v_proj.bias.data.copy_(enc_layer0.self_attn.in_proj_bias.data[256:])
    enc_layer1.self_attn.o_proj.weight.data.copy_(enc_layer0.self_attn.out_proj.weight.data)
    enc_layer1.self_attn.o_proj.bias.data.copy_(enc_layer0.self_attn.out_proj.bias.data)
    enc_layer1.fc1.weight.data.copy_(enc_layer0.linear1.weight.data)
    enc_layer1.fc1.bias.data.copy_(enc_layer0.linear1.bias.data)
    enc_layer1.fc2.weight.data.copy_(enc_layer0.linear2.weight.data)
    enc_layer1.fc2.bias.data.copy_(enc_layer0.linear2.bias.data)

    torch.manual_seed(800)
    x = torch.randn(4, 5, 128).float()
    x_mask = torch.tensor([[0, 0, 0, 0, 0], [0, 0, 0, 1, 1], [0, 0, 0, 0, 1], [0, 0, 1, 1, 1]]).byte()
    y10 = enc_layer0(x, src_key_padding_mask=x_mask.to(torch.bool)).contiguous()
    y11 = enc_layer1(x, x_mask).contiguous()
    assert torch.allclose(y10[0], y11[0], rtol=1e-3)
    assert torch.allclose(y10[1, :3], y11[1, :3], rtol=1e-3)
    assert torch.allclose(y10[2, :4], y11[2, :4], rtol=1e-3)
    assert torch.allclose(y10[3, :2], y11[3, :2], rtol=1e-3)

    torch.manual_seed(950)
    dec_layer0 = nn.TransformerDecoderLayer(128, 4, dim_feedforward=512, dropout=0.0, batch_first=True)
    nn.init.normal_(dec_layer0.self_attn.in_proj_weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.self_attn.in_proj_bias, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.self_attn.out_proj.weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.self_attn.out_proj.bias, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.multihead_attn.in_proj_weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.multihead_attn.in_proj_bias, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.multihead_attn.out_proj.weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.multihead_attn.out_proj.bias, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.linear1.weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.linear1.bias, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.linear2.weight, mean=0.0, std=0.05)
    nn.init.normal_(dec_layer0.linear2.bias, mean=0.0, std=0.05)
    dec_layer1 = TransformerLayer(True, 128, 4, 512, 0.0)
    dec_layer1.self_attn.q_proj.weight.data.copy_(dec_layer0.self_attn.in_proj_weight.data[:128, :])
    dec_layer1.self_attn.q_proj.bias.data.copy_(dec_layer0.self_attn.in_proj_bias.data[:128])
    dec_layer1.self_attn.k_proj.weight.data.copy_(dec_layer0.self_attn.in_proj_weight.data[128:256, :])
    dec_layer1.self_attn.k_proj.bias.data.copy_(dec_layer0.self_attn.in_proj_bias.data[128:256])
    dec_layer1.self_attn.v_proj.weight.data.copy_(dec_layer0.self_attn.in_proj_weight.data[256:, :])
    dec_layer1.self_attn.v_proj.bias.data.copy_(dec_layer0.self_attn.in_proj_bias.data[256:])
    dec_layer1.self_attn.o_proj.weight.data.copy_(dec_layer0.self_attn.out_proj.weight.data)
    dec_layer1.self_attn.o_proj.bias.data.copy_(dec_layer0.self_attn.out_proj.bias.data)
    dec_layer1.cross_attn.q_proj.weight.data.copy_(dec_layer0.multihead_attn.in_proj_weight.data[:128, :])
    dec_layer1.cross_attn.q_proj.bias.data.copy_(dec_layer0.multihead_attn.in_proj_bias.data[:128])
    dec_layer1.cross_attn.k_proj.weight.data.copy_(dec_layer0.multihead_attn.in_proj_weight.data[128:256, :])
    dec_layer1.cross_attn.k_proj.bias.data.copy_(dec_layer0.multihead_attn.in_proj_bias.data[128:256])
    dec_layer1.cross_attn.v_proj.weight.data.copy_(dec_layer0.multihead_attn.in_proj_weight.data[256:, :])
    dec_layer1.cross_attn.v_proj.bias.data.copy_(dec_layer0.multihead_attn.in_proj_bias.data[256:])
    dec_layer1.cross_attn.o_proj.weight.data.copy_(dec_layer0.multihead_attn.out_proj.weight.data)
    dec_layer1.cross_attn.o_proj.bias.data.copy_(dec_layer0.multihead_attn.out_proj.bias.data)
    dec_layer1.fc1.weight.data.copy_(dec_layer0.linear1.weight.data)
    dec_layer1.fc1.bias.data.copy_(dec_layer0.linear1.bias.data)
    dec_layer1.fc2.weight.data.copy_(dec_layer0.linear2.weight.data)
    dec_layer1.fc2.bias.data.copy_(dec_layer0.linear2.bias.data)

    torch.manual_seed(1000)
    x = torch.randn(4, 5, 128).float()
    e = torch.randn(4, 3, 128).float()
    x_mask = torch.tensor([[0, 0, 0, 0, 0], [0, 0, 0, 1, 1], [0, 0, 0, 0, 1], [0, 0, 1, 1, 1]]).byte()
    e_mask = torch.tensor([[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 1]]).byte()
    y30 = dec_layer0(x, e, tgt_mask=future_mask[:5, :5], tgt_key_padding_mask=x_mask.to(torch.bool), memory_key_padding_mask=e_mask.to(torch.bool)).contiguous()
    y31 = dec_layer1(x, x_mask, e, e_mask).contiguous()
    assert torch.allclose(y30[0], y31[0], rtol=1e-3)
    assert torch.allclose(y30[1, :3], y31[1, :3], rtol=1e-3)
    assert torch.allclose(y30[2, :4], y31[2, :4], rtol=1e-3)
    assert torch.allclose(y30[3, :2], y31[3, :2], rtol=1e-3)


test_transformer_layer()

NameError: name 'torch' is not defined

### Putting them together: Transformer

In this section, I will implement a Transformer encoder-decoder model for sequence-to-sequence tasks using the building blocks we've already created: scaled dot-product attention, multi-head attention, and Transformer encoder/decoder layers.

Model Overview:

- Encoder: `n_layers` layers
- Decoder: `n_layers` layers
- Learned positional embeddings instead of sinusoidal positional encodings (as in "Attention is All You Need")
- Shared embedding matrices to reduced number of parameters and to improve training stability:
  - Encoder input embeddings
  - Decoder input embeddings
  - Decoder output layer weights (a linear classifier over n_words vocabulary, whose weight matrix happens to have the same shape as word embeddings, so their weights can be shared (section 3.4 of the paper))
- Layer normalization after the embedding layers
- Shared positional embedding matrix and normalization layer for both encoder and decoder
- Decoder's first input token: `[EOS]`
- Handling of pad tokens in labels (-100). Huggingface `transformers` pads labels with padding index -100 instead of `pad_id`. So we do processing as follows:
  - Replace -100 in decoder input
  - Ignore -100 in labels when calculating loss

We Implement the `make_positions` method to generate input for the positional embedding layer and take care of the pad tokens by repeating the last position

Given a `padding_mask`:

```
[[0, 0, 0, 0, 1],
 [0, 0, 0, 1, 1]]
```
We would return:

```
[[0, 1, 2, 3, 3],
 [0, 1, 2, 2, 2]]
```

#### Testing implementation:

To test the final transformer implementation, we will train it on the x-sum dataset, we expect a validation loss of around 4.5. 

Run the `main` function using a bach command inside Colab or Kaggle.