In [25]:
from tokenize import tokenize
from torch import nn
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import math

## Multi Head Attention

In [4]:
class OneHeadSelfAttentionQKV(nn.Module):
    def __init__(self, k, low_dim):
        super().__init__()
        # Check if input is divisible by number of heads
        self.k = k    
        self.low_dim = low_dim 
        # 1. Define linear transformations to reduce dimensionnalité of input
        # biais = False because we want only weights
        self.to_reduce_dim = nn.Linear(k, low_dim, bias=False)
        # 2. Define linear transformations to key, queries and values
        # biais = False because we want only weights
        self.to_queries = nn.Linear(low_dim, low_dim, bias=False)
        self.to_keys    = nn.Linear(low_dim, low_dim, bias=False) 
        self.to_values  = nn.Linear(low_dim, low_dim, bias=False)

    def forward(self, Q, K, V):
        # 3. Reduce dimensionnalité of input
        low_dim_Q = self.to_reduce_dim(Q)
        low_dim_K = self.to_reduce_dim(K)
        low_dim_V = self.to_reduce_dim(V)

        
        # 4. Apply the linear transformation associated to every input to obtain the key, query and value
        query = self.to_queries(low_dim_Q) 
        key = self.to_keys(low_dim_K)
        value = self.to_values(low_dim_V)

        # 5. Compute the raw weights w′ij=𝐪iT𝐤j and normalize them
        weights_raw = torch.bmm(query, key.transpose(1, 2))
        weights_raw_normalized = torch.div(weights_raw, torch.sqrt(torch.tensor(self.low_dim)))

        # 6. We apply the Softmax function to the similarity dimension (batch dim x input dim x sim dim)
        weights = nn.Softmax(dim=2)(weights_raw_normalized)

        # 7. Multiply weights of self attention to the values
        return torch.bmm(weights, value)
    

class MultiHeadSelfAttentionQKV(nn.Module):
    # 8.Define a head number that is divisible from the input 
    def __init__(self, k, heads=4):
        super().__init__()
        # Check if input is divisible by number of heads
        assert k % heads == 0

        self.k = k
        self.heads = heads  

        # 9. Instantiate OneHeadSelfAttention multiple times to have MultiHeadSelfAttention
        self.list_heads = []
        for head in range(self.heads):
            self.list_heads.append(OneHeadSelfAttentionQKV(k, k//heads))

        # This will be applied after the multi-head self-attention operation.
        self.unifyheads = nn.Linear(k, k)
    
    def forward(self, Q, K, V):
        # 10. Get all heads elements 
        list_to_concat = []
        for one_head in self.list_heads:
            list_to_concat.append((one_head(Q, K, V),))

        # 11. Concatenate all the heads
        multi_heads = sum(list_to_concat, ())        
        concatenated = torch.cat(multi_heads, dim=2)

        # 12. Linear transformation
        return self.unifyheads(concatenated)


## Embeddings

In [5]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, dimension):
        super(Embedding, self).__init__()
        self.word_embedding = nn.Embedding(vocab_size, dimension)
        
    def forward(self, x):
        return self.word_embedding(x)

In [6]:
class PositionalEmbedding(nn.Module):
    def __init__(self, dimension, max_seq_length=2000):
        super(PositionalEmbedding, self).__init__()

        positional_encoding = torch.zeros(max_seq_length, dimension)    
        for pos in range(max_seq_length):
            for i in range(dimension):
                if i%2 == 0:
                    pe = math.sin(pos / 1000**(2*i/dimension))
                else:
                    pe = math.cos(pos / 1000**(2*i/dimension))
                positional_encoding[pos, i] = pe

        self.register_buffer('positional_encoding', positional_encoding)
        
    def forward(self, x):
        return x + self.positional_encoding[:x.size(1), :]

## Feed Forward

In [7]:
class FeedForward(nn.Module):
    def __init__(self, embed_dim, factor=2):
        super(FeedForward, self).__init__()
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, factor*embed_dim),
            nn.ReLU(),
            nn.Linear(factor*embed_dim, embed_dim)
        )  
    
    def forward(self, x):
        return self.feed_forward(x)

<img src="encoder.png" alt="drawing" width="200"/>

In [11]:
class TransformerEncodingBloc(nn.Module):
    def __init__(self, embedding_dim, num_heads, dropout, factor=1):
        super(TransformerEncodingBloc, self).__init__()
        # Mutli Head Attention with its notmalization and its dropout
        self.attention = MultiHeadSelfAttentionQKV(embedding_dim, num_heads)
        self.normalization_mha = nn.LayerNorm(embedding_dim)
        self.dropout_mha = nn.Dropout(dropout)

        # Feed Forward with its notmalization and its dropout
        self.feed_forward = FeedForward(embedding_dim, factor)
        self.normalization_ff = nn.LayerNorm(embedding_dim)
        self.dropout_ff = nn.Dropout(dropout)
        

    def forward(self, query, key, value):
        mha = self.attention(query, key, value)
        mha_residuals = mha + value
        mha_residuals_norm = self.normalization_mha(mha_residuals)
        mha_residuals_norm_dropout = self.dropout_mha(mha_residuals_norm)

        ff = self.feed_forward(mha_residuals_norm_dropout)
        ff_residuals = ff + mha_residuals_norm_dropout
        ff_residuals_norm = self.normalization_ff(ff_residuals)
        ff_residuals_norm_dropout = self.dropout_ff(ff_residuals_norm)

        return ff_residuals_norm_dropout


In [15]:
X = torch.rand(10, 1000, 256)

TEB = TransformerEncodingBloc(256, 4, 0.2, 1)
Z = TEB(X, X, X)

In [21]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_heads, dropout, num_layers, factor=1):
        super(Encoder, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.positional_embedding = PositionalEmbedding(embedding_dim)
        self.transformer_layers = nn.ModuleList([TransformerEncodingBloc(embedding_dim, num_heads, dropout) for i in range(num_layers)])

    def forward(self, X):
        encoded = self.embedding(X)
        output = self.positional_embedding(encoded)
        for transformer_layer in self.transformer_layers:
            output = transformer_layer(output, output, output)
        
        return output
        

In [26]:
X = np.random.randn(10, 1000)

encoder = Encoder(vocab_size=5000, 
                  embedding_dim=256, 
                  num_heads=4,
                  dropout=0.2,
                  num_layers=5
                  )

Y = encoder(X)

TypeError: embedding(): argument 'indices' (position 2) must be Tensor, not numpy.ndarray