In [None]:
import torch
import torch.nn as nn
import math
import numpy

In [None]:
# Input Embeddings

class InputEmbeddings(nn.Module):
  def __init__(self, d_model: int, vocab_size: int):
    super().__init__()
    self.d_model = d_model
    self.vocab_size = vocab_size
    self.embeddings = nn.Embedding(vocab_size, d_model)

  def forward(self, x):
    return self.embeddings(x) * math.sqrt(self.d_model)




In [None]:
# Testing/Visualizing InputEmbeddings Functionality

# hypothetical vocab size, and d_model
ex_vocab_size = 10
dim_model = 4

_embeddings = InputEmbeddings(d_model=dim_model, vocab_size=ex_vocab_size)
print(_embeddings.embeddings)

# sentance with token ID 1,2
tokenized_sentence = torch.LongTensor([1, 2])

# Get embeddings for the tokenized sentence
embedded_sentence = _embeddings(tokenized_sentence)
print(embedded_sentence)
print(embedded_sentence.shape)



Embedding(10, 4)
tensor([[-1.5787,  3.0195,  3.2364,  0.4048],
        [ 3.1898, -2.2366,  0.4090,  1.7561]], grad_fn=<MulBackward0>)
torch.Size([2, 4])


In [None]:
# Positional Encoding

class PositionalEncoding(nn.Module):
  def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
    super().__init__()
    self.d_model = d_model
    # max length of the sentance (need to create one vector for each position)
    self.seq_len = seq_len
    self.dropout = nn.Dropout(dropout)

    # matrix of shape (seq_len, dim model)
    pos_enc = torch.zeros(seq_len, d_model)

    # vector for seq_len
    position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)

    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)/ d_model))

    # from paper, sin to even positions
    pos_enc[:, 0::2] = torch.sin(position * div_term)
    pos_enc[:, 1::2] = torch.cos(position * div_term)

    # we will have a batch of sentances, so we need a new dimension
    pos_enc = pos_enc.unsqueeze(0)

    self.register_buffer("pos_enc", pos_enc)

  def forward(self, x):
    # pos enc to every word in sentance
    x = x + (self.pos_enc[:, :x.shape[1], :]).requires_grad_(False)
    return self.dropout(x)


In [None]:
d_model_ex = 4
vocab_size_ex = 10
seq_len_ex = 5
batch_size_ex = 3
dropout_ex = 0.1

dummy_input_ex = torch.randint(vocab_size_ex, (batch_size_ex, seq_len_ex))
print(dummy_input_ex.shape)

# Instantiate the classes with updated parameters
input_embeddings_ex = InputEmbeddings(d_model_ex, vocab_size_ex)
print(input_embeddings_ex.embeddings)
positional_encoding_ex = PositionalEncoding(d_model_ex, seq_len_ex, dropout_ex)

# Process the inputs through the classes
embedded_input_ex = input_embeddings_ex(dummy_input_ex)
encoded_input_ex = positional_encoding_ex(embedded_input_ex)

print("Dummy Input (Token IDs):", dummy_input_ex)
print("Output after InputEmbeddings (Token Embeddings):", embedded_input_ex)
print("with shape", embedded_input_ex.shape)
print("Output after PositionalEncoding (Positionally Encoded Embeddings):", encoded_input_ex)

torch.Size([3, 5])
Embedding(10, 4)
Dummy Input (Token IDs): tensor([[2, 7, 1, 2, 4],
        [9, 7, 8, 2, 4],
        [9, 2, 8, 6, 2]])
Output after InputEmbeddings (Token Embeddings): tensor([[[ 1.9724, -2.8027,  3.0187,  1.6199],
         [-2.0318, -1.3844, -0.7549, -0.0879],
         [ 0.6107, -2.4446,  2.5084,  0.9897],
         [ 1.9724, -2.8027,  3.0187,  1.6199],
         [ 1.4508, -1.7542, -0.4345,  0.0565]],

        [[ 2.5120,  0.2577,  2.9352,  1.2423],
         [-2.0318, -1.3844, -0.7549, -0.0879],
         [-2.3486,  2.3704,  0.6784, -2.7064],
         [ 1.9724, -2.8027,  3.0187,  1.6199],
         [ 1.4508, -1.7542, -0.4345,  0.0565]],

        [[ 2.5120,  0.2577,  2.9352,  1.2423],
         [ 1.9724, -2.8027,  3.0187,  1.6199],
         [-2.3486,  2.3704,  0.6784, -2.7064],
         [-0.6457,  0.5665,  1.5093,  1.6582],
         [ 1.9724, -2.8027,  3.0187,  1.6199]]], grad_fn=<MulBackward0>)
with shape torch.Size([3, 5, 4])
Output after PositionalEncoding (Positionally 

for this example, think of 3 sentances of 5, in a vocab list of 10. the embeddings are kind of just like a lookup table for each word in the vocab list thats why its just a constant dim of (vocab size x dim). So we first tokenize the sentance, then add positional encoding (using formula from paper)

In [None]:
# Layer normalization for Add and Norm (from layer normalization paper)

class LayerNormalization(nn.Module):
  def __init__(self, eps: float = 10**-6) -> None:
    super().__init__()
    self.eps = eps
    # episilon in demoniator of xhat. If sigma happens to be 0 we need this epsilon
    self.alpha = nn.Parameter(torch.ones(1)) # makes it learnable (multiplier)
    self.bias = nn.Parameter(torch.zeroes(1)) # adder

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True)
    std = x.std(dim = -1, keepdim=True)

    return self.alpha * (x - mean) / (std + self.eps) + self.bias

In [None]:
# From paper, FFN(x) = max(0, xW1 + b1)W2 + b2

class FeedForwardBlock(nn.Module):
  def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
    super().__init__()
    self.linear_1 = nn.Linear(d_model, d_ff) # this is W1 and B1
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(d_ff, d_model) # this is W2 and B2

  def forward(self, x):
    # (Batch, Seq_Len, d_model) --> (Batch, Seq_Len, d_model)
    return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))


Quick Notes on Attention: 
- For each word, we create a Query vector, a Key vector, and a Value vector. These vectors are created by multiplying the embedding by three matrices that we trained during the training process. SO, We end up creating a “query”, a “key”, and a “value” projection of each word in the input sentence.
- Next we calculate a "score" which is just the dot product of the query vector with the key vector of the respective word we’re scoring. So if we’re processing the self-attention for the word in position #1, the first score would be the dot product of q1 and k1, then second q1 * k2, ... q1 * kn
- Lastly, we divide the scores by 8 (the square root of the dimension of the key vectors used in the paper — 64. This leads to having more stable gradients and then apply a softmax on all the scores. Can think of it as: "each of these scores determines how much how much each word will be expressed at this position"

With these scores, we then just multiply each value vector 

Note: also just need to recall multihead attention (see formula in paper)




In [None]:
# Attention

class MultiHeadAttentionBlock(nn.Module):
  def __init__(self, d_model: int, h: int, dropout: float) -> None:
    super().__init__()
    self.d_model = d_model
    self.h = h
    # make sure dim is dvisible by heads: 
    assert d_model % h == 0
    self.d_k = d_model // h # From paper
    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)

    self.w_o = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)

  @staticmethod
  def attention(query, key, value, mask, dropout: nn.Dropout):
      d_k = query.shape[-1]
      # Just apply the formula from the paper
      # (batch, h, seq_len, d_k) --> (batch, h, seq_len, seq_len)
      attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
      if mask is not None:
          # Write a very low value (indicating -inf) to the positions where mask == 0
          attention_scores.masked_fill_(mask == 0, -1e9)
      attention_scores = attention_scores.softmax(dim=-1) # (batch, h, seq_len, seq_len) # Apply softmax
      if dropout is not None:
          attention_scores = dropout(attention_scores)
      # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
      # return attention scores which can be used for visualization
      return (attention_scores @ value), attention_scores



  def forward(self, q, k, v, mask):
    # mask is for words not interacting with others. (before multiplying the attention scores by the value)
    query = self.w_q(q)
    key = self.w_k(k)
    value = self.w_v(v)

    # heads
    query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1,2) # transpose is because we want 2nd dimension (each head will see seq_len * d_k)
    key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
    value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

    x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)

    # Combine all the heads together
    # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
    x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)

    # Multiply by Wo
    # (batch, seq_len, d_model) --> (batch, seq_len, d_model)  
    return self.w_o(x)