# imports

practicing implementing a transformer with an encoder, a decoder, and an encoder+decoder  

In [2]:
import torch 
import math 
# import torch.nn.functional as F 
from torch import nn  

print(torch.__version__)

2.1.1


# Positional Encoder Class 

In [3]:
class PositionalEncoder(nn.Module):
    def __init__(self, embedding_size, max_seq_len=512):
        """
        embedding_size == d_model (model dimensions)
        """
        super(PositionalEncoder, self).__init__()
        self.embedding_size = embedding_size
        self.max_seq_len = max_seq_len 
        
        # positional encoder tensor
        pe = torch.zeros(max_seq_len, embedding_size)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        multiplier = -(math.log(10000.0)/embedding_size)
        div_term = torch.exp(torch.arange(
            0, embedding_size, 2, dtype=torch.float) * multiplier)
        
        # using sin and cos to encode positional information
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        
        # set pe as a non-trainable parameter
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x 

In [4]:
z = torch.zeros(4, 8)
print(type(z))
z 

<class 'torch.Tensor'>


tensor([[0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0.]])

In [5]:
z[:, 0::2] = 1
z[:, 1::2] = 2
z

tensor([[1., 2., 1., 2., 1., 2., 1., 2.],
        [1., 2., 1., 2., 1., 2., 1., 2.],
        [1., 2., 1., 2., 1., 2., 1., 2.],
        [1., 2., 1., 2., 1., 2., 1., 2.]])

In [6]:
z[:, :4]

tensor([[1., 2., 1., 2.],
        [1., 2., 1., 2.],
        [1., 2., 1., 2.],
        [1., 2., 1., 2.]])

In [7]:
torch.arange(0, 12, dtype=torch.float).unsqueeze(1)

tensor([[ 0.],
        [ 1.],
        [ 2.],
        [ 3.],
        [ 4.],
        [ 5.],
        [ 6.],
        [ 7.],
        [ 8.],
        [ 9.],
        [10.],
        [11.]])

In [8]:
a = torch.arange(0, 8, 2, dtype=torch.float)
print(a)
torch.exp(a)

tensor([0., 2., 4., 6.])


tensor([  1.0000,   7.3891,  54.5982, 403.4288])

In [9]:
math.log(10.)

2.302585092994046

In [10]:
2.718 ** 4

54.575510850575995

# Multi-headed Attention Class 

In [11]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_size, num_heads):
        """
        embedding_size: embedding dimension size, model dimensions 
        """
        super(MultiHeadAttention, self).__init__()
        self.embedding_size = embedding_size
        self.num_heads = num_heads
        self.head_dim = embedding_size // num_heads
        
        self.query_linear = nn.Linear(embedding_size, embedding_size)
        self.key_linear = nn.Linear(embedding_size, embedding_size)
        self.value_linear = nn.Linear(embedding_size, embedding_size)
        self.output_linear = nn.Linear(embedding_size, embedding_size)
        
    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.num_heads, self.head_dim) 
        return x.permute(0, 2, 1, 3).contiguous().view(batch_size * self.num_heads, -1, self.head_dim)
    
    @staticmethod
    def compute_attention(query, key, mask=None):
        scores = torch.matmul(query, key.permute(1,2,0))
        if mask is not None:
            scores = scores.masked_fill(mask==0, float("-1e9"))
        attention_weights = torch.nn.functional.softmax(scores, dim=-1)
        return attention_weights 
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        query = self.split_heads(self.query_linear(query), batch_size)
        key = self.split_heads(self.key_linear(key), batch_size)
        value = self.split_heads(self.value_linear(value), batch_size)
        
        attention_weights = self.compute_attention(query, key, mask)
        
        output = torch.matmul(attention_weights, value)
        output = (output
                  .view(batch_size, self.num_heads, -1, self.head_dim)
                  .permute(0, 2, 1, 3)
                  .contiguous()
                  .view(batch_size, -1, self.embedding_size))
        
        return self.output_linear(output)

# Encoder Only Transformer Class

In [12]:
class FeedForwardSublayer(nn.Module):
    def __init__(self, model_dimensions, dim_between_layers):
        super(FeedForwardSublayer, self).__init__()
        self.fc1 = nn.Linear(model_dimensions, dim_between_layers)
        self.fc2 = nn.Linear(dim_between_layers, model_dimensions)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))
    

class EncoderLayer(nn.Module):
    def __init__(self, model_dimensions, num_heads, dim_between_layers, dropout):
        """
        Args:
            model_dimensions: d_model
            num_heads: 
            dim_between_layers: d_ff
            dropout: 
        """
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(model_dimensions, num_heads)
        self.feed_forward = FeedForwardSublayer(model_dimensions, dim_between_layers)
        self.norm1 = nn.LayerNorm(model_dimensions)
        self.norm2 = nn.LayerNorm(model_dimensions)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask) # ??? 
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x 
    
    
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_seq_len):
        """
        Args:
            vocab_size: 
            d_model: model_dimensions
            num_layers: 
            num_heads: 
            d_ff: dim_between_layers
            dropout: 
            max_seq_len: 
        """
        super(TransformerEncoder, self).__init__() 
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoder(embedding_size=d_model, max_seq_len=max_seq_len)
        self.layers = nn.ModuleList([
            EncoderLayer(model_dimensions=d_model, num_heads=num_heads, dim_between_layers=d_ff, dropout=dropout)
            for _ in range(num_layers)
        ])
        
    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x     

In [13]:
class ClassifierHead(nn.Module):
    def __init__(self, d_model, num_classes):
        super(ClassifierHead, self).__init__()
        self.fc = nn.Linear(d_model, num_classes)
        
    def forward(self, x):
        logits = self.fc(x)
        return torch.nn.functional.log_softmax(logits, dim=-1)


class RegressionHead(nn.Module):
    def __init__(self, d_model, output_dim):
        super(RegressionHead, self).__init__()
        self.fc = nn.Linear(d_model, output_dim)
        
    def forward(self, x):
        return self.fc(x) 

# - use the TransformerEncoder 

In [14]:
num_classes = 3
vocab_size = 10000
batch_size = 8
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
sequence_length = 256
dropout = 0.1

In [15]:
input_sequence = torch.randint(0, vocab_size, (batch_size, sequence_length))
mask = torch.randint(0, 2, (sequence_length, sequence_length))

display(input_sequence)
display(mask)

tensor([[5745, 5481, 9081,  ..., 1325, 4717, 1093],
        [5433, 6022, 1421,  ..., 8081, 9967, 7382],
        [9417,  824, 6403,  ..., 6317, 7269, 6616],
        ...,
        [9749, 7232, 5846,  ..., 5896,  539, 5191],
        [8603, 4108, 2288,  ..., 3268, 6223, 2204],
        [7711, 3315, 9603,  ..., 6887, 8022, 2197]])

tensor([[0, 0, 0,  ..., 1, 0, 1],
        [0, 1, 1,  ..., 0, 0, 0],
        [1, 1, 0,  ..., 0, 0, 0],
        ...,
        [0, 1, 1,  ..., 1, 0, 1],
        [0, 0, 0,  ..., 0, 1, 1],
        [0, 0, 0,  ..., 0, 0, 0]])

In [16]:
encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, sequence_length)
classifier = ClassifierHead(d_model, num_classes)

output = encoder(input_sequence, mask)
classification_logits = classifier(output)

print("Classification Logits: ", classification_logits)

RuntimeError: The size of tensor a (64) must match the size of tensor b (256) at non-singleton dimension 0

# Decoder Only Transformer Class
Both the body (decoder) and the head (classifier/regressor) are implemented in the Decoder class.

In [ ]:
# not sure if this implementation is correct??? 
class DecoderLayer(nn.Module):
    def __init__(self, model_dimensions, num_heads, dim_between_layers, dropout):
        """
        Args:
            model_dimensions: d_model
            num_heads: 
            dim_between_layers: d_ff
            dropout: 
        """
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(model_dimensions, num_heads)
        self.feed_forward = FeedForwardSublayer(model_dimensions, dim_between_layers)
        self.norm1 = nn.LayerNorm(model_dimensions)
        self.norm2 = nn.LayerNorm(model_dimensions)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask) # ??? 
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x
    
    
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_len):
        super(TransformerDecoder, self).__init__() 
        self.embedding = nn.Embedding(embedding_dim=d_model, # ? 
                                      num_embeddings=vocab_size) # ? 
        self.positional_encoding = PositionalEncoder(d_model, max_sequence_len)
        
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        
        # the linear layer head for next word prediction 
        self.fc = 0
        
    def forward(self, x, self_mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x, self_mask)
        x = self.fc(x)
        return torch.nn.functional.log_softmax(x, dim=-1)
        
        

# - use the decoder only transform

In [ ]:
num_classes = 3
vocab_size = 10000
batch_size = 8
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
sequence_length = 256
dropout = 0.1

input_sequence = torch.randint(0, vocab_size, (batch_size, sequence_length))

self_attention_mask = (
    1 - torch.triu(torch.ones(1, sequence_length, sequence_length),
                   diagonal=1)
).bool()

decoder = TransformerDecoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, sequence_length)

output = decoder(input_sequence, self_attention_mask)

print(output.shape)