# Transformer from Attention Is All You Need

In [19]:
# import necessary packages
%pip install torch
%pip install torchtext==0.6.0
%pip install -U spacy
import torch
import torch.nn as nn 
!python -m spacy download en_core_web_sm


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting en-core-web-sm==3.5.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.5.0/en_core_web_sm-3.5.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: en-core-web-sm
Successfully installed en-core-web-sm-3.5.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


## Attention

#### See section 3.2 

We'll start by building the self attention module, this is what makes the transformer architecture so powerfull. It takes in an embedding size, and a number of heads. We split the embedding into however many heads there are, this is multi-head attention. 

(ex. if we have an embedding size 256 and heads is 8, we split it into 8x32 parts)

An attention layer is a component within neural network architectures that enables the model to focus on specific parts of the input data while processing it. The attention mechanism assigns different weights to the input elements based on their relative importance for the given context. This allows the model to selectively focus on relevant parts of the input when making predictions or generating outputs. Here's a high-level description of how an attention layer works in the context of the self-attention mechanism used in the Transformer architecture:

- Input: The attention layer takes three inputs: queries (Q), keys (K), and values (V). In the case of self-attention, all three of these inputs are derived from the same input sequence (e.g., a sentence in a natural language processing task). The input sequence is first converted into continuous embeddings.

- Linear Projections: The input embeddings are passed through separate linear layers to create query, key, and value matrices. These linear layers learn to project the input embeddings into different subspaces that capture different aspects of the data.

- Dot Product Attention: The dot product between the query matrix (Q) and the transpose of the key matrix (K) is computed. This results in an attention score matrix that measures the compatibility between each pair of query and key elements. The intuition behind this is that the model learns to attend to input elements with higher compatibility scores.

- Scaling: The dot product can result in large values, which can cause gradients to become too small when backpropagating through the softmax function. To mitigate this issue, the dot product attention scores are scaled by dividing them by the square root of the key's dimensionality.

- Masking (optional): In some cases, such as when processing input sequences in the decoder of a Transformer, certain positions should not be attended to (e.g., future positions). A mask can be applied to the scaled dot product attention scores to prevent the model from attending to these positions.

- Softmax: A softmax function is applied to the scaled (and masked) attention scores along the key dimension. This normalizes the scores, so they sum up to 1, producing the final attention weights.

- Weighted Sum: The attention weights are multiplied by the value matrix (V) to compute a weighted sum. This step essentially aggregates the values based on their computed attention weights, producing the final output of the attention layer.

- Output: The output of the attention layer is a continuous vector representation that combines the input elements based on their relative importance. This output can be further processed by the model or used as input for subsequent layers.



In [2]:
class SelfAttention(nn.Module):
  def __init__(self, embed_size, heads):
    super(SelfAttention, self).__init__()

    assert (embed_size % heads == 0), 'Embed size not divisible by heads'

    self.embed_size = embed_size
    self.heads = heads
    self.head_dimension = embed_size // heads

    # define linear layers to send queries, keys, and values through
    self.values = nn.Linear(self.head_dimension, self.head_dimension, bias=False)
    self.keys = nn.Linear(self.head_dimension, self.head_dimension, bias=False)
    self.queries = nn.Linear(self.head_dimension, self.head_dimension, bias=False)
    
    # concatenate heads after multi-head attention
    self.fully_connected_out = nn.Linear(embed_size, embed_size)

  def forward(self, values, keys, query, mask):
    # num training examples to send in at one time
    N = query.shape[0]

    # these correspond to source sentence length and target sentence length
    value_len = values.shape[1]
    key_len = keys.shape[1]
    query_len = query.shape[1]

    # split embeddings into multiple heads
    values = values.reshape(N, value_len, self.heads, self.head_dimension)
    keys = keys.reshape(N, key_len, self.heads, self.head_dimension)
    queries = query.reshape(N, query_len, self.heads, self.head_dimension)

    # send through linear layers
    values = self.values(values)
    keys = self.keys(keys)
    queries = self.queries(queries)

    #------- MatMul Q and K(Transposed) ---------#
      # queries shape: (N, query_len, heads, head_dimension)
      # keys shape: (N, key_len, heads, head_dimension)
    # we want
      # QdotK shape: (N, heads, query_len, key_len)
    QdotKt = torch.einsum('nqhd,nkhd->nhqk', [queries, keys])

    #------------ Scale ------------#
    # QdotKt = QdotKt / (self.embed_size ** (1/2))

    #----- Mask (for decoder) ------#
    # decoder requires masked multi-head attention
    if mask is not None:
      # closes elements above the diagonal so that the model can't see future values
      QdotKt = QdotKt.masked_fill(mask == 0, float('-1e20'))

    #---------- Softmax ------------#
    soft = torch.softmax(QdotKt / (self.embed_size ** (1/2)), dim=3)
    # attention = torch.softmax(QdotKt)

    #------ MatMul soft and V ------#
      # soft shape: (N, heads, query_len, key_len)
      # values shape: (N, value_len, heads, head_dimension)
    # we want
      # (N, query_len, heads, head_dimension)
      # after multiplying, flatten last two dimensions
    out = torch.einsum('nhql,nlhd->nqhd', [soft,values]).reshape(
      N, query_len, self.heads*self.head_dimension)

    #------ Concatenate heads ------#
    out = self.fully_connected_out(out)
    
    return out


## Transformer Block
Now that we have out attention mechanism we can construct the transformer block, which will be used to create both the encoder and decoder.

In [3]:
class TransformerBlock(nn.Module):
  # transformer block architecture
    # attention block -> add & normalize -> feed forward -> add & normalize

  def __init__(self, embed_size, heads, dropout, forward_expansion):
    super(TransformerBlock, self).__init__()
    self.attention = SelfAttention(embed_size, heads)
    self.norm1 = nn.LayerNorm(embed_size)
    self.norm2 = nn.LayerNorm(embed_size)

    self.feed_forward = nn.Sequential(
      nn.Linear(embed_size, forward_expansion * embed_size),
      nn.ReLU(),
      nn.Linear(forward_expansion * embed_size, embed_size)
    )

    self.dropout = nn.Dropout(dropout)

  def forward(self, value, key, query, mask):
    # attention
    attention = self.attention(value, key, query, mask)

    # add & norm
    x = self.dropout(self.norm1(attention + query))

    # feed forward
    forward = self.feed_forward(x)

    # add & norm
    out = self.dropout(self.norm2(forward + x))

    return out


## Encoder

This encoder represents the left part of the diagram in Figure 1.

Because the previous components we built will be used in the encoder, we have to pass through all of the hyperparameters.

There is a new parameter called max_length, this is related to the positional enbedding. We have to tell the model what our max length of sentence is. It will vary based on the dataset.

In [4]:
class Encoder(nn.Module):
  def __init__(
    self,
    src_vocab_size,
    embed_size,
    num_layers,
    heads,
    device,
    forward_expansion,
    dropout,
    max_length
  ):

    super(Encoder, self).__init__()
    # define embeddings for input
    self.embed_size = embed_size
    self.device = device
    self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
    self.positional_embedding = nn.Embedding(max_length, embed_size)

    # define layers, which consists of a single transformer blocks
    self.layers = nn.ModuleList(
      [
        TransformerBlock(
          embed_size,
          heads,
          dropout=dropout,
          forward_expansion=forward_expansion
        )
        for _ in range(num_layers)
      ]
    )
    
    self.dropout = nn.Dropout(dropout)

  def forward(self, x, mask):
    # we get the number of examples and the sequence legnth
    N, seq_len = x.shape

    # we create a range of 0 to the sequence length for every example
    positions = torch.arange(0, seq_len).expand(N, seq_len).to(self.device)

    # we give the word embedding and the postions and the model will now know the positions of words
    out = self.dropout(self.word_embedding(x) + self.positional_embedding(positions))

    for layer in self.layers:
      # in the encoder, all our v, k, q input vectors are the same
      out = layer(out, out, out, mask)

    return out

## Decoder

First we will create the decoder block, without the embeddings or linear and softmax which are outside.

In [5]:
class DecoderBlock(nn.Module):
  def __init__(
    self,
    embed_size,
    heads,
    forward_expansion,
    dropout,
    device
  ):
    super(DecoderBlock, self).__init__()
    
    self.attention = SelfAttention(embed_size, heads)
    self.norm = nn.LayerNorm(embed_size)
    self.transformer_block = TransformerBlock(
      embed_size, heads, dropout, forward_expansion
    )
    self.dropout = nn.Dropout(dropout)

  def forward(self, x, value, key, src_mask, trg_mask):
    # masked attention
    attention = self.attention(x,x,x, trg_mask)

    # add and norm
    query = self.dropout(self.norm(attention + x))

    # transformer block
    out = self.transformer_block(value, key, query, src_mask)

    return out


Next we can use the decoder block make the whole decoder.

In [6]:
class Decoder(nn.Module):
  def __init__(
    self,
    trg_vocab_size,
    embed_size,
    num_layers,
    heads,
    forward_expansion,
    dropout,
    device,
    max_length
  ):
    super(Decoder, self).__init__()

    # define embeddings 
    self.device = device
    self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
    self.positional_embedding = nn.Embedding(max_length, embed_size)

    # create num_layers of the decoder block 
    self.layers = nn.ModuleList(
      [
        DecoderBlock(
          embed_size, heads, forward_expansion, dropout, device
        )
        for _ in range(num_layers)
      ]
    )

    # final linear layer
    self.fully_connected_out = nn.Linear(embed_size, trg_vocab_size)

    self.dropout = nn.Dropout(dropout)

  def forward(self, x, encoder_out, src_mask, trg_mask):
    # we get the number of examples and the sequence legnth
    N, seq_len = x.shape
    
    # we create a range of 0 to the sequence length for every example
    positions = torch.arange(0, seq_len).expand(N, seq_len).to(self.device)

    # we give the word embedding and the postions and the model will now know the positions of words
    x = self.dropout((self.word_embedding(x) + self.positional_embedding(positions)))

    # run the decoder block for all of the layers
    for layer in self.layers:
      x = layer(x, encoder_out, encoder_out, src_mask, trg_mask)

    # run final linear layer
    out = self.fully_connected_out(x)

    # final softmax 
    # out = torch.softmax(out)

    return out


## Transformer

Now that we've constructed all of the components, we can put them together.

In [7]:
class Transformer(nn.Module):
  def __init__(
    self,
    src_vocab_size,
    trg_vocab_size,
    src_pad_idx,
    trg_pad_idx,
    embed_size=256,
    num_layers=6,
    forward_expansion=4,
    heads=8,
    dropout=0,
    # device='cuda',
    device = 'cpu',
    max_length=100
  ):
    super(Transformer, self).__init__()

    # define encoder
    self.encoder = Encoder(
      src_vocab_size,
      embed_size,
      num_layers,
      heads,
      device,
      forward_expansion,
      dropout,
      max_length
    )

    # define decoder
    self.decoder = Decoder(
      trg_vocab_size,
      embed_size,
      num_layers,
      heads,
      forward_expansion,
      dropout,
      device,
      max_length
    )

    self.src_pad_idx = src_pad_idx
    self.trg_pad_idx = trg_pad_idx
    self.device = device

  # define make src mask
  def make_src_mask(self, src):
    # we want the src_mask in the shape of (N, 1, 1, src_len)
    # if src is the src pad index then it will be 1, if not it will be 0
    src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
    return src_mask.to(self.device)

  # define make trg mask
  def make_trg_mask(self, trg):
    N, trg_len = trg.shape

    # we want a lower triangular matrix 
    trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
      N, 1, trg_len, trg_len
    )

    return trg_mask.to(self.device)

  def forward(self, src, trg):
    src_mask = self.make_src_mask(src)
    trg_mask = self.make_trg_mask(trg)

    enc_src = self.encoder(src, src_mask)

    out = self.decoder(trg, enc_src, src_mask, trg_mask)

    return out

## Testing

Now that we've created the transformer model, we can test it to see if it works.

In [10]:
if __name__ == '__main__':
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

  x = torch.tensor([[1,5,6,4,3,9,5,2,0], [1,8,7,3,4,5,6,7,2]]).to(device)

  trg = torch.tensor([[1,7,4,3,5,9,2,0], [1,5,6,2,4,7,6,2]]).to(device)

  src_pad_idx = 0
  trg_pad_idx = 0
  src_vocab_size = 10
  trg_vocab_size = 10

  model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx).to(device)

  out = model(x, trg[:, :-1])

  print(out.shape)

torch.Size([2, 7, 10])


# Test Cases

## Self attention forward pass

In [11]:
embed_size = 32
heads = 4
N = 10
seq_len = 20

values = torch.rand(N, seq_len, embed_size)
keys = torch.rand(N, seq_len, embed_size)
queries = torch.rand(N, seq_len, embed_size)
mask = torch.ones(seq_len, seq_len)

self_attn = SelfAttention(embed_size, heads)
out = self_attn(values, keys, queries, mask)
assert out.shape == (N, seq_len, embed_size)


## TransformerBlock forward pass

In [12]:
embed_size = 32
heads = 4
N = 10
seq_len = 20
dropout = 0.2
forward_expansion = 2

value = torch.rand(N, seq_len, embed_size)
key = torch.rand(N, seq_len, embed_size)
query = torch.rand(N, seq_len, embed_size)
mask = torch.ones(seq_len, seq_len)

trans_block = TransformerBlock(embed_size, heads, dropout, forward_expansion)
out = trans_block(value, key, query, mask)
assert out.shape == (N, seq_len, embed_size)


## Same tests but with Unittest

In [13]:
import torch
from torch.autograd import Variable
import numpy as np
import unittest

class TestSelfAttention(unittest.TestCase):

    def test_forward_pass(self):
        embed_size = 8
        heads = 2
        num_examples = 3
        value_len = 5
        key_len = 5
        query_len = 5

        values = torch.rand(num_examples, value_len, embed_size)
        keys = torch.rand(num_examples, key_len, embed_size)
        queries = torch.rand(num_examples, query_len, embed_size)
        mask = None

        attention_layer = SelfAttention(embed_size, heads)
        output = attention_layer(values, keys, queries, mask)

        self.assertEqual(output.size(), (num_examples, query_len, embed_size))

class TestTransformerBlock(unittest.TestCase):

    def test_forward_pass(self):
        embed_size = 8
        heads = 2
        dropout = 0.1
        forward_expansion = 4
        num_examples = 3
        value_len = 5
        key_len = 5
        query_len = 5

        value = torch.rand(num_examples, value_len, embed_size)
        key = torch.rand(num_examples, key_len, embed_size)
        query = torch.rand(num_examples, query_len, embed_size)
        mask = None

        transformer_block = TransformerBlock(embed_size, heads, dropout, forward_expansion)
        output = transformer_block(value, key, query, mask)

        self.assertEqual(output.size(), (num_examples, query_len, embed_size))

class TestEncoder(unittest.TestCase):

    def test_forward_pass(self):
        src_vocab_size = 100
        embed_size = 16
        num_layers = 2
        heads = 4
        forward_expansion = 4
        dropout = 0.1
        max_length = 10
        num_examples = 3
        seq_len = 6

        x = torch.LongTensor(num_examples, seq_len).random_(0, src_vocab_size)
        mask = np.triu(np.ones((seq_len, seq_len)), k=1).astype('bool')
        mask = torch.from_numpy(mask).to(torch.bool)

        encoder = Encoder(
            src_vocab_size,
            embed_size,
            num_layers,
            heads,
            'cpu',
            forward_expansion,
            dropout,
            max_length
        )

        output = encoder(x, mask)

        self.assertEqual(output.size(), (num_examples, seq_len, embed_size))

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)




...
----------------------------------------------------------------------
Ran 3 tests in 0.011s

OK


# Run code below to train the model on IMDB dataset

In [23]:

# SentimentClassifier
class SentimentClassifier(nn.Module):
    def __init__(self, src_vocab_size, embed_size, num_layers, heads, device, forward_expansion, dropout, max_length, num_classes):
        super(SentimentClassifier, self).__init__()

        self.encoder = Encoder(src_vocab_size, embed_size, num_layers, heads, device, forward_expansion, dropout, max_length)
        self.linear = nn.Linear(embed_size, num_classes)

    def forward(self, x, mask):
        out = self.encoder(x, mask)
        out = out.mean(dim=1)
        out = self.linear(out)
        return out

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext.data 
import torchtext.datasets 

In [21]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define your hyperparameters
src_vocab_size = 25_000
embed_size = 512
num_layers = 3
heads = 8
forward_expansion = 4
dropout = 0.1
max_length = 100
num_classes = 2
num_epochs = 5
batch_size = 64
lr = 0.0003

TEXT = torchtext.data.Field(tokenize="spacy", tokenizer_language="en_core_web_sm", batch_first=True, lower=True, fix_length=max_length)
LABEL = torchtext.data.LabelField(dtype=torch.float)

train_data, test_data = torchtext.datasets.IMDB.splits(TEXT, LABEL)

TEXT.build_vocab(train_data, max_size=src_vocab_size - 2)
LABEL.build_vocab(train_data)

train_iterator, test_iterator = torchtext.data.BucketIterator.splits(
    (train_data, test_data),
    batch_size=batch_size,
    device=device,
    sort_within_batch=True,
    sort_key=lambda x: len(x.text),
)


In [24]:
model = SentimentClassifier(src_vocab_size, embed_size, num_layers, heads, device, forward_expansion, dropout, max_length, num_classes).to(device)

In [30]:
def train(model, iterator, criterion, optimizer, device):
    model.train()
    epoch_loss = 0
    
    for batch in iterator:
        optimizer.zero_grad()
        text, label = batch.text.to(device), batch.label.to(device)
        
        mask = (text != 1).unsqueeze(1).unsqueeze(2).to(device)
        predictions = model(text, mask)

        loss = criterion(predictions, label.long())
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [31]:
def evaluate(model, iterator, criterion, device):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for batch in iterator:
            text, label = batch.text.to(device), batch.label.to(device)
            
            mask = torch.tensor(text != 1, dtype=torch.bool).to(device)
            predictions = model(text, mask)

            loss = criterion(predictions, label.long())
            epoch_loss += loss.item()
            
    return epoch_loss / len(iterator)


In [33]:
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=lr)


for epoch in range(num_epochs):
    train_loss = train(model, train_iterator, criterion, optimizer, device)
    valid_loss = evaluate(model, test_iterator, criterion, device)

    print(f"Epoch [{epoch + 1}/{num_epochs}] - Train Loss: {train_loss:.4f}, Test Loss: {valid_loss:.4f}")


KeyboardInterrupt: 