In [20]:
import torch
import torch.nn as nn

# ENCODER

In [18]:
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_heads):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(1000, hidden_dim)
        self.layers = nn.ModuleList([nn.TransformerEncoderLayer(hidden_dim, num_heads) 
                                     for _ in range(num_layers)])

    def forward(self, x):
        x = self.embedding(x)
        positions = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        x = x + self.pos_embedding(positions)
        for layer in self.layers:
            x = layer(x)      
        return x

In [16]:
input_dim = 100
hidden_dim = 32
num_layers = 2
num_heads = 4
encoder = TransformerEncoder(input_dim, hidden_dim, num_layers, num_heads)
x = torch.randint(0, input_dim, (5, 10))
output = encoder(x)
print(output.shape) # should be (5, 10, 32)

torch.Size([5, 10, 32])


# nn.Embedding

In [29]:
vocab_size = 1000
embedding_size = 128
embedding = nn.Embedding(vocab_size, embedding_size)
input_tokens = torch.tensor([[1, 4, 2], [3, 0, 5]])
embeddings = embedding(input_tokens)
print(embeddings)

tensor([[[-1.1586e-01,  1.4418e-01,  2.4824e-01,  7.9643e-01, -1.1111e+00,
          -1.7674e+00,  1.3876e+00,  2.7908e-01,  1.3656e-01,  2.5834e-03,
          -1.8136e+00, -1.9596e-02,  3.0940e-01,  9.6444e-01, -9.2067e-01,
           9.8577e-02, -3.1250e-01, -1.9562e-01, -5.6895e-01, -2.4601e-02,
          -1.0320e+00,  8.2333e-01, -4.8053e-01, -7.8006e-01,  3.1310e-02,
          -2.4122e-01, -7.7215e-01, -1.6397e-01,  1.0781e+00,  1.0855e+00,
          -1.1037e+00,  3.6565e-01, -1.2201e+00,  9.8392e-01,  2.1615e-01,
           1.7461e+00, -2.7702e-01, -3.6400e-01,  1.8724e+00, -8.9769e-01,
           1.9036e-01, -7.8654e-02, -4.3329e-01, -6.6862e-01, -1.7167e+00,
          -1.4221e-01,  8.0571e-01, -1.5285e+00, -1.4034e+00, -5.0367e-02,
          -5.0362e-01,  1.0217e+00, -1.7950e-01, -5.6588e-01,  1.5529e-01,
           1.6634e+00, -6.0327e-01, -1.0066e-01, -2.0894e+00,  1.2368e+00,
           7.1615e-01,  5.4580e-01,  3.2003e-01,  8.7431e-01, -8.3630e-01,
           7.3131e-01,  1

In [25]:
input_dim = 100
hidden_dim = 32
num_layers = 2
num_heads = 4
encoder = TransformerEncoder(input_dim, hidden_dim, num_layers, num_heads)
x = torch.randint(0, input_dim, (5, 10))
output = encoder(x)
print(output)

tensor([[[-1.6081e+00, -5.2491e-01, -1.0215e+00,  ..., -8.5316e-01,
          -1.2378e+00,  1.3094e+00],
         [ 7.2766e-01, -8.1402e-01,  1.2481e+00,  ...,  3.4231e-01,
           1.9388e+00,  4.8365e-02],
         [ 1.7458e-01, -1.3275e+00,  8.7460e-01,  ...,  1.5677e+00,
           5.7695e-01, -6.0695e-01],
         ...,
         [-3.3355e-01, -1.7720e+00,  9.2804e-01,  ...,  5.8213e-01,
          -4.7191e-01,  1.0192e+00],
         [-2.4009e-01, -3.0194e-02, -1.6326e-01,  ...,  1.3162e+00,
           2.8604e+00, -1.0205e+00],
         [-1.7223e-02,  1.6216e-01,  1.4983e+00,  ..., -1.2546e+00,
           1.2675e-01,  5.8488e-01]],

        [[-1.6182e+00, -3.9983e-01, -5.6120e-01,  ..., -1.1844e+00,
          -4.7705e-01, -4.0962e-01],
         [-3.4997e-01, -1.8715e+00,  1.3966e+00,  ..., -5.2869e-02,
           1.2092e+00,  1.1818e+00],
         [-2.2094e-01, -1.9799e+00,  4.1086e-01,  ...,  1.3935e+00,
           9.0611e-01,  3.8902e-01],
         ...,
         [-4.3519e-01, -6

# DECODER

In [26]:
import torch
import torch.nn as nn

class TransformerDecoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, num_layers, num_heads):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(1000, hidden_dim)
        self.layers = nn.ModuleList([nn.TransformerDecoderLayer(hidden_dim, num_heads)
                                     for _ in range(num_layers)])
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x, encoder_output):
        x = self.embedding(x)
        positions = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        x = x + self.pos_embedding(positions)    
        for layer in self.layers:
            x = layer(x, encoder_output)
        x = self.fc(x)
        return x


In [27]:
input_dim = 100
output_dim = 100
hidden_dim = 32
num_layers = 2
num_heads = 4
encoder = TransformerEncoder(input_dim, hidden_dim, num_layers, num_heads)
decoder = TransformerDecoder(output_dim, hidden_dim, num_layers, num_heads)
x = torch.randint(0, input_dim, (5, 10))
encoder_output = encoder(x)
y = torch.randint(0, output_dim, (5, 10))
output = decoder(y, encoder_output)
print(output.shape)

torch.Size([5, 10, 100])


# ENCODER & DECODER LAYER IN PYTORCH

In [54]:
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads):
        super().__init__()
        self.encoder_embedding = nn.Embedding(input_dim, hidden_dim)
        self.decoder_embedding = nn.Embedding(output_dim, hidden_dim)
        self.encoder_pos_embedding = nn.Embedding(1000, hidden_dim)
        self.decoder_pos_embedding = nn.Embedding(1000, hidden_dim)
        self.encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(hidden_dim, num_heads) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([
            nn.TransformerDecoderLayer(hidden_dim, num_heads) for _ in range(num_layers)])
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, encoder_input, decoder_input):
        encoder_embedded = self.encoder_embedding(encoder_input)
        decoder_embedded = self.decoder_embedding(decoder_input)
        encoder_positions = torch.arange(encoder_input.size(1),
                                         device=encoder_input.device).unsqueeze(0)
        decoder_positions = torch.arange(decoder_input.size(1), 
                                         device=decoder_input.device).unsqueeze(0)
        encoder_embedded = encoder_embedded + self.encoder_pos_embedding(encoder_positions)
        decoder_embedded = decoder_embedded + self.decoder_pos_embedding(decoder_positions)
        for layer in self.encoder_layers:
            encoder_embedded = layer(encoder_embedded)
        encoder_output = encoder_embedded
        for layer in self.decoder_layers:
            decoder_embedded = layer(decoder_embedded, encoder_output)
        output = self.fc(decoder_embedded)
        return output


# TRAIN

In [47]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator

SRC = Field(tokenize="spacy", tokenizer_language="fr", init_token="<sos>", eos_token="<eos>", 
            lower=True)
TRG = Field(tokenize="spacy", tokenizer_language="en", init_token="<sos>", eos_token="<eos>", 
            lower=True)
train_data, valid_data, test_data = Multi30k.splits(exts=(".fr", ".en"), fields=(SRC, TRG))
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
HIDDEN_DIM = 256
NUM_LAYERS = 3
NUM_HEADS = 8
model = Transformer(INPUT_DIM, OUTPUT_DIM, HIDDEN_DIM, NUM_LAYERS, NUM_HEADS)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=TRG.vocab.stoi[TRG.pad_token])

def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        src = batch.src
        trg = batch.trg
        optimizer.zero_grad()
        output = model(src, trg[:, :-1])
        output_dim = output.shape[-1]
        output = output.contiguous().view(-1, output_dim)
        trg = trg[:, 1:].contiguous().view(-1)
        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

BATCH_SIZE = 32
N_EPOCHS = 10
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    datasets=(train_data, valid_data, test_data),
    batch_size=BATCH_SIZE,
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    sort_within_batch=True,
    sort_key=lambda x: len(x.src),
)
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.3f}")

TypeError: Unable to convert function return value to a Python type! The signature was
	() -> handle

# TEST 

In [45]:
# Test the model
def translate_sentence(model, sentence, src_field, trg_field, max_len=50):
    model.eval()
    
    if isinstance(sentence, str):
        tokens = [token.text.lower() for token in spacy_fr(sentence)]
    else:
        tokens = [token.lower() for token in sentence]
    
    tokens = [src_field.init_token] + tokens + [src_field.eos_token]
    src_indexes = [src_field.vocab.stoi[token] for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(0)
    
    src_mask = model.make_src_mask(src_tensor)
    
    with torch.no_grad():
        encoder_outputs = model.encoder(src_tensor, src_mask)
    
    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]
    
    for i in range(max_len):
        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0)
        
        trg_mask = model.make_trg_mask(trg_tensor)
        print(trg_mask)