<a href="https://colab.research.google.com/github/Pooret/resume/blob/main/pytorch_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
np.arange(0, 50).unsqeeze(1)

AttributeError: 'numpy.ndarray' object has no attribute 'unsqeeze'

**Positional Encoding**

The purpose of these positional encodings is to inject some information about the relative or absolute position of the tokens in the sequence, since the transformer has no built-in notion of order.

*pos* - position

*i* - dimension

Each dimenson of the positional encoding corresponds to a sinusoid and have wavelengths that form a geometric progression from $2\pi$ to $10000 * 2\pi$

$${PE}_{(pos, 2i)} = \sin(\frac{pos}{10000^{2i / d_{model}}})$$
$${PE}_{(pos, 2i+1)} = \cos(\frac{pos}{10000^{2i / d_{model}}})$$



In [6]:
import torch
import torch.nn as nn
import math

d_model = 512
max_len = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class PositionalEncoding(nn.Module):
  def __init__(self, d_model, max_len=500):
    super(PositionalEncoding, self).__init__()
    self.encoding = torch.zeros(1, max_len, d_model)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (max_len, 1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float()* -(math.log(10000.0) / d_model)) # decreasing scaling factor length of d_model//2

    self.encoding[:, :, 0::2] = torch.sin(position * div_term)
    self.encoding[:, :, 1::2] = torch.cos(position * div_term)

  def forward(self, x):
    """
    x - (batch_size, seq_len, d_model)
    """
    seq_len = x.size(1)
    return x + self.encoding[:, :seq_len, :].to(device)

pos_encoder = PositionalEncoding(d_model, max_len)
input_tensor = torch.zeros(1, max_len, d_model)
output = pos_encoder(input_tensor)
print(output.shape)  # (1, max_len, d_model)

torch.Size([1, 100, 512])


$$\mathbf{Attention}(Q, K, V) = \mathsf{softmax}(\frac{QK^T}{\sqrt{d_{k}}})V$$

In [29]:
d_model = 512
max_len = 100
dropout = 0.1
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class ScaledDotProductAttention(nn.Module):
  def __init__(self, d_model, dropout=0.1):
    super(ScaledDotProductAttention, self).__init__()
    self.temperature = math.sqrt(d_model) # scaling factor
    self.dropout = nn.Dropout(dropout)
    self.softmax = nn.Softmax(dim=2)

  def forward(self, q, k, v, mask=None):
    """
    q, k, v (batch_size, n_heads, seq_len, d_k)
    """
    # k transpose (batch_size, n_heads, d_k, seq_len)
    attn = torch.matmul(q, k.transpose(-2, -1)) / self.temperature # attn - (batch_size, n_heads, seq_len, seq_len)
    if mask is not None:
      attn = attn.masked_fill(mask == 0, -1e9) # large negative numbers for softmax
    attn = self.softmax(attn)
    attn = self.dropout(attn)
    output = torch.matmul(attn, v)
    return output, attn # (batch_size, n_heads, seq_len, d_k) (batch_size, n_heads, seq_len, seq_len)

attn = ScaledDotProductAttention(d_model, dropout)
q = torch.rand(64, 10, d_model)  # (batch_size, seq_len, n_heads * d_k)
k = torch.rand(64, 10, d_model)
v = torch.rand(64, 10, d_model)
output, attn_weights = attn(q, k, v)
print(output.shape)  # (batch_size, seq_len, n_heads * d_k)
print(attn_weights.shape)  # (batch_size, seq_len, seq_len)

torch.Size([64, 10, 512])
torch.Size([64, 10, 10])


In [30]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, n_heads, dropout = 0.1):
    super(MultiHeadAttention, self).__init__()
    self.n_heads = n_heads
    self.d_model = d_model
    self.d_k = d_model // n_heads

    self.q_linear = nn.Linear(d_model, d_model)
    self.k_linear = nn.Linear(d_model, d_model)
    self.v_linear = nn.Linear(d_model, d_model)
    self.fc = nn.Linear(d_model, d_model)

    self.attention = ScaledDotProductAttention(self.d_k, dropout)
    self.dropout = nn.Dropout(dropout)
    self.layer_norm = nn.LayerNorm(d_model) # normalize across features dimension

  def forward(self, q, k, v, mask=None):
    batch_size = q.size(0) # (batch_size, seq_len, d_model)

    # q, k, v - (batch_size, seq_len, n_heads, d_k)
    q = self.q_linear(q).view(batch_size, -1, self.n_heads, self.d_k) # n_heads * d_k = d_model
    k = self.k_linear(k).view(batch_size, -1, self.n_heads, self.d_k)
    v = self.v_linear(v).view(batch_size, -1, self.n_heads, self.d_k)

    # (batch_size, seq_len, n_heads, d_k) -> (batch_size, n_heads, seq_len, d_k)
    q = q.transpose(1,2)
    k = k.transpose(1,2)
    v = v.transpose(1,2)

    attn_output, attn_weights = self.attention(q, k, v, mask) # outputs - (batch_size, n_heads, seq_len, d_k) (batch_size, n_heads, seq_len, seq_len)

    # The contiguous() function is required because transpose may change the memory layout, making it non-contiguous. The view function requires a contiguous tensor
    attn_output = attn_output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model) #  (batch_size, n_heads, seq_len, d_k) - > (batch_size, seq_len, n_heads * d_k)
    output = self.dropout(self.fc(attn_output)) # (batch_size, seq_len, d_model)
    output = self.layer_norm(output + q.reshape(batch_size, -1, self.d_model))

    return output, attn_weights # (batch_size, seq_len, d_model), (batch_size, n_heads, seq_len, seq_len)

d_model = 512
n_heads = 8
dropout = 0.1
multi_head_attn = MultiHeadAttention(d_model, n_heads, dropout)
q = torch.rand(64, 10, d_model)  # (batch_size, seq_len, n_heads * d_k)
k = torch.rand(64, 10, d_model)
v = torch.rand(64, 10, d_model)
output, attn_weights = multi_head_attn(q, k, v)
print(output.shape)  # (batch_size, seq_len, d_model)
print(attn_weights.shape)  #(batch_size, n_heads, seq_len, seq_len)

torch.Size([64, 10, 512])
torch.Size([64, 8, 10, 10])


In [31]:
import torch.nn.functional as F

class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff, dropout=0.1):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, d_ff)
    self.dropout = nn.Dropout(dropout)
    self.linear2 = nn.Linear(d_ff, d_model)
    self.layer_norm = nn.LayerNorm(d_model)

  def forward(self, x):
    residual = x
    x = self.linear1(x)
    x = F.relu(x)
    x = self.dropout(x)
    x = self.linear2(x)
    x = self.dropout(x)
    x = self.layer_norm(x + residual) # normalize across features dimension
    return x

d_model = 512
d_ff = 2048
dropout = 0.1
ffn = PositionwiseFeedForward(d_model, d_ff, dropout)
input_tensor = torch.rand(64, 10, d_model)  # (batch_size, seq_len, d_model)
output = ffn(input_tensor)
print(output.shape)  # (batch_size, seq_len, d_model)

torch.Size([64, 10, 512])


In [34]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        # Self-attention sublayer
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = self.layer_norm(x + attn_output)  # Add & Norm

        # Feed-forward sublayer
        x = self.feed_forward(x)

        return x

d_model = 512
n_heads = 8
d_ff = 2048
num_layers = 1
dropout = 0.1
encoder_layer = EncoderLayer(d_model, n_heads, d_ff, dropout)
input_tensor = torch.rand(64, 10, d_model)  # (batch_size, seq_len, d_model)
output = encoder_layer(input_tensor)
print(output.shape)  # (batch_size, seq_len, d_model)

torch.Size([64, 10, 512])


In [35]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
    super(DecoderLayer, self).__init__()
    self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
    self.cross_attn = MultiHeadAttention(d_model, n_heads, dropout)
    self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
    self.layer_norm = nn.LayerNorm(d_model)

  def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
    # self-attention sublayer
    self_attn_output, _ = self.self_attn(x, x, x, tgt_mask)
    x = self.layer_norm(x + self_attn_output) # add and norm

    # cross-attention sublayer (q->x, k,v -> enc_output)
    cross_attn_output, _ = self.cross_attn(x, enc_output, enc_output, src_mask)

    # feed-forward sublayer
    x = self.feed_forward(x)

    return x # (batch_size, seq_len, d_model)

d_model = 512
n_heads = 8
d_ff = 2048
dropout = 0.1
decoder_layer = DecoderLayer(d_model, n_heads, d_ff, dropout)
input_tensor = torch.rand(64, 10, d_model)  # (batch_size, seq_len, d_model)
enc_output = torch.rand(64, 10, d_model)  # (batch_size, seq_len, d_model)
output = decoder_layer(input_tensor, enc_output)
print(output.shape)  # (batch_size, seq_len, d_model)

torch.Size([64, 10, 512])


In [37]:
class Encoder(nn.Module):
  def __init__(self, d_model, n_heads, d_ff, num_layers, dropout=0.1):
    super(Encoder, self).__init__()
    self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)])
    self.layer_norm = nn.LayerNorm(d_model)

  def forward(self, x, mask=None):
    for layer in self.layers:
      x = layer(x, mask)
    x = self.layer_norm(x)
    return x

class Decoder(nn.Module):
  def __init__(self, d_model, n_heads, d_ff, num_layers, dropout=0.1):
    super(Decoder, self).__init__()
    self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)])
    self.layer_norm = nn.LayerNorm(d_model)

  def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
    for layer in self.layers:
      x = layer(x, enc_output, src_mask, tgt_mask)
    x = self.layer_norm(x)
    return x

class Transformer(nn.Module):
  def __init__(self, d_model, n_heads, d_ff, num_layers, src_vocab_size, tgt_vocab_size, max_len, dropout=0.1):
    super(Transformer, self).__init__()
    self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
    self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
    self.positional_encoding = PositionalEncoding(d_model, max_len)
    self.encoder = Encoder(d_model, n_heads, d_ff, num_layers, dropout)
    self.decoder = Decoder(d_model, n_heads, d_ff, num_layers, dropout)
    self.fc = nn.Linear(d_model, tgt_vocab_size)

  def forward(self, src, tgt, src_mask=None, tgt_mask=None):
    # Encoder
    enc_output = self.encoder_embedding(src) # enc_output - (batch_size, src_len, d_model)
    enc_output = self.positional_encoding(enc_output)
    enc_output = self.encoder(enc_output, src_mask)

    # Decoder
    dec_output = self.decoder_embedding(tgt) # dec_output - (batch_size, tgt_len, d_model)
    dec_output = self.positional_encoding(enc_output)
    dec_output = self.decoder(dec_output, enc_output, src_mask, tgt_mask)

    # Final layer
    output = self.fc(dec_output) # (batch_size, tgt_len, tgt_vocab_size)
    return output

d_model = 512
n_heads = 8
d_ff = 2048
num_layers = 6
src_vocab_size = 10000
tgt_vocab_size = 10000
max_len = 100
dropout = 0.1
transformer = Transformer(d_model, n_heads, d_ff, num_layers, src_vocab_size, tgt_vocab_size, max_len, dropout)
src = torch.randint(0, src_vocab_size, (64, 10))  # (batch_size, src_len)
tgt = torch.randint(0, tgt_vocab_size, (64, 10))  # (batch_size, tgt_len)
output = transformer(src, tgt)
print(output.shape) # (batch_size, tgt_len, tgt_vocab_size)

torch.Size([64, 10, 10000])
