In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np

In [2]:
class Embedding(nn.Module):
    def __init__(self, vocabulary_size, emb_dim):
        super().__init__()
        self.vocabulary_size = vocabulary_size
        self.emb_dim = emb_dim
        self.embedder = nn.Embedding(vocabulary_size, emb_dim)

    def forward(self, batch):
        return self.embedder(batch)
"""
class Embedding(nn.Module):
    def __init__(self, vocabulary_size, emb_dim):
        super().__init__()
        self.vocabulary_size = vocabulary_size
        self.emb_dim = emb_dim
        self.embedder = nn.Linear(vocabulary_size, emb_dim)

    def forward(self, batch):
        batch = F.one_hot(batch, self.vocabulary_size).float()
        return self.embedder(batch)
"""
class PositionalEmbedding(nn.Module):
    def __init__(self, max_seq_len, emb_dim):
        super(PositionalEmbedding, self).__init__()
        self.max_seq_len = max_seq_len
        self.emb_dim = emb_dim
        pe = torch.zeros(max_seq_len, emb_dim)
        for pos in range(max_seq_len):
            for i in range(0, emb_dim , 2):
                pe[pos, i] = math.sin(pos / (1e5 ** (2 * i / emb_dim)))
                if i + 1 >= emb_dim:
                    break
                pe[pos, i + 1]= math.cos(pos / (1e5 ** (2 * i / emb_dim)))
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        seq_len = x.size(1)
        x = x + torch.autograd.Variable(self.pe[:,:seq_len, :], requires_grad=False)
        return x

In [3]:
embedding = Embedding(3, 10)
posEmb = PositionalEmbedding(10, 10)

In [4]:
x.shape

NameError: name 'x' is not defined

In [None]:
x = embedding(torch.tensor([[1, 1, 0, 2, 0]]))
posEmb(x)

In [10]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_input, n_head = 3):
        super(MultiHeadAttention, self).__init__()
        self.n_input = n_input
        self.n_head = n_head
        self.n_size = n_input // n_head
        self.key = nn.Linear(n_input, self.n_size * n_head)
        self.query = nn.Linear(n_input, self.n_size * n_head)
        self.value = nn.Linear(n_input, self.n_size * n_head)
        self.out = nn.Linear(n_input, n_input)
    def forward(self, x):
        key = self.key(x)
        query = self.query(x)
        value = self.value(x)
        batch_size = x.size(0)
        seq_len = x.size(1)
        
        key = key.reshape(batch_size, seq_len, self.n_head, self.n_size) 
        query = query.reshape(batch_size, seq_len, self.n_head, self.n_size) 
        value = value.reshape(batch_size, seq_len, self.n_head, self.n_size)
        key = key.transpose(1, 2)
        query = query.transpose(1, 2)
        value = value.transpose(1, 2)
        attention_weights = (query @ key.transpose(-2, - 1)) / math.sqrt(self.n_size)
        attention_scores = nn.Softmax(dim = -1)(attention_weights)
        output = attention_scores @ value
        output = output.transpose(1, 2)
        output = output.reshape(batch_size, seq_len, self.n_input)
        output = self.out(output)
        return output
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, n_input, n_head = 3):
        super(MultiHeadCrossAttention, self).__init__()
        self.n_input = n_input
        self.n_head = n_head
        self.n_size = n_input // n_head
        self.key = nn.Linear(n_input, self.n_size * n_head)
        self.query = nn.Linear(n_input, self.n_size * n_head)
        self.value = nn.Linear(n_input, self.n_size * n_head)
        self.out = nn.Linear(n_input, n_input)
    def forward(self, x_encoder, x_decoder):
        key = self.key(x_encoder)
        query = self.query(x_decoder)
        value = self.value(x_encoder)
        batch_size = x_encoder.size(0)
        seq_len_encoder = x_encoder.size(1)
        seq_len_decoder = x_decoder.size(1)
        key = key.reshape(batch_size, seq_len_encoder, self.n_head, self.n_size) 
        query = query.reshape(batch_size, seq_len_decoder, self.n_head, self.n_size) 
        value = value.reshape(batch_size, seq_len_encoder, self.n_head, self.n_size)
        key = key.transpose(1, 2)
        query = query.transpose(1, 2)
        value = value.transpose(1, 2)
        attention_weights = (query @ key.transpose(-2, - 1)) / math.sqrt(self.n_size)
        attention_scores = nn.Softmax(dim = -1)(attention_weights)
        output = attention_scores @ value
        output = output.transpose(1, 2)
        output = output.reshape(batch_size, seq_len_decoder, self.n_input)
        output = self.out(output)
        return output

In [11]:
test = MultiHeadAttention(6, 3)

In [12]:
torch.tensor([[[0., 1, 1.5, 2, 3, 4], [4., 4., 3, 2.4, 1.8, 1]]]).shape

torch.Size([1, 2, 6])

In [13]:
test(torch.tensor([[[0., 1, 1.5, 2, 3, 4], [4., 4., 3, 2.4, 1.8, 1]]]))

tensor([[[-0.3023, -0.4288,  0.9929,  1.5199,  0.1874,  1.3461],
         [-0.4214, -0.4523,  1.0127,  1.4972,  0.0902,  1.4811]]],
       grad_fn=<ViewBackward0>)

In [14]:
torch.tensor([[[0., 1, 1.5, 2, 3, 4]], [[4., 4., 3, 2.4, 1.8, 1]]]).shape

torch.Size([2, 1, 6])

In [94]:
class EncoderBlock(nn.Module):
    def __init__(self, n_input, n_head):
        super(EncoderBlock, self).__init__()
        self.attention = MultiHeadAttention(n_input, n_head)
        self.layer_norm1 = nn.LayerNorm(n_input)
        self.fc = nn.Sequential(
            nn.Linear(n_input, n_input),
            nn.ReLU(),
        )
        self.layer_norm2 = nn.LayerNorm(n_input)
        self.dropout1 = nn.Dropout(p = 0.2)
        self.dropout2 = nn.Dropout(p = 0.2)
    def forward(self, x):
        x_with_attentions = self.dropout1(self.layer_norm1(self.attention(x)))
        x = x + x_with_attentions
        x_fc = self.dropout2(self.layer_norm2(self.fc(x)))
        x = x + x_fc
        return x


class Encoder(nn.Module):
    def __init__(self,vocab_size, embedding_size, max_len_seq, n_encoders = 2, n_head = 3):
        super(Encoder, self).__init__()
        self.pos_encoder = PositionalEmbedding(max_len_seq, embedding_size)
        self.embedding = Embedding(vocab_size, embedding_size)
        self.layers = nn.ModuleList()
        for i in range(n_encoders):
            self.layers.append(EncoderBlock(embedding_size, n_head))
    def forward(self, x):
        batch_size = x.size(0)
        len_seq = x.size(1)
        emb = self.embedding(x)
        emb = self.pos_encoder(emb)
        res = emb
        for layer in self.layers:
            res = layer(res)
        return res
class DecoderBlock(nn.Module):
    def __init__(self, n_input, n_head):
        super(DecoderBlock, self).__init__()
        self.attention = MultiHeadAttention(n_input, n_head)
        self.cross_attention = MultiHeadCrossAttention(n_input, n_head)
        self.layer_norm1 = nn.LayerNorm(n_input)
        self.fc = nn.Sequential(
            nn.Linear(n_input, n_input),
            nn.ReLU(),
        )
        self.layer_norm2 = nn.LayerNorm(n_input)
        self.layer_norm3 = nn.LayerNorm(n_input)
        self.dropout1 = nn.Dropout(p = 0.2)
        self.dropout2 = nn.Dropout(p = 0.2)
        self.dropout3 = nn.Dropout(p = 0.2)
    def forward(self, x_decoder, x_encoder):
        x_with_attentions = self.dropout1(self.layer_norm1(self.attention(x_decoder)))
        x = x_decoder + x_with_attentions
        x_with_cross_attentions = self.dropout2(self.layer_norm2(self.cross_attention(x_encoder, x)))
        x = x + x_with_cross_attentions
        x_fc = self.dropout3(self.layer_norm3(self.fc(x)))
        x = x + x_fc
        return x
class Decoder(nn.Module):
    def __init__(self,target_vocab_size, input_vocab_size, embedding_size, max_len_seq, n_decoders = 2, n_head = 3):
        super(Decoder, self).__init__()
        self.pos_encoder = PositionalEmbedding(max_len_seq, embedding_size)
        self.embedding = Embedding(input_vocab_size, embedding_size)
        self.layers = nn.ModuleList()
        self.out = nn.Linear(embedding_size, target_vocab_size)
        
        for i in range(n_decoders):
            self.layers.append(DecoderBlock(embedding_size, n_head))
    def forward(self, x_decoder, x_encoder):
        batch_size = x_encoder.size(0)
        len_seq = x_encoder.size(1)
        emb = self.embedding(x_decoder)
        emb = self.pos_encoder(emb)
        res = emb
        for layer in self.layers:
            res = layer(res, x_encoder)
        return self.out(res)
class Transformer(nn.Module):
    def __init__(self, vocab_size, target_vocab_size, embedding_size, max_len_seq, n_encoders = 2, n_decoders= 2, n_head = 3):
        super(Transformer, self).__init__()
        self.encoder = Encoder(vocab_size = vocab_size, embedding_size = embedding_size,
                               max_len_seq = max_len_seq, n_encoders = n_encoders, n_head = n_head)
        self.decoder = Decoder(target_vocab_size = target_vocab_size, input_vocab_size = vocab_size, 
                               embedding_size = embedding_size, max_len_seq = max_len_seq, n_decoders = n_decoders,
                               n_head = n_head)
        self.vocab_size = vocab_size
        self.target_vocab_size = target_vocab_size
        self.embedding_size = embedding_size
        self.max_len_seq = max_len_seq
        self.softmax = nn.Softmax(dim = -1)
    def forward(self, x, target = None):
        encoded = self.encoder(x)
        batch_size = x.size(0)
        if target is None:
            target = torch.zeros(batch_size, 1, dtype = torch.int) 
        decoded = self.decoder(target, encoded)
        return self.softmax(decoded)

In [95]:
t = Transformer(10, 20, 30, 100)

In [96]:
dec = Decoder(10, 10, 9, 20, )

In [97]:
enc = Encoder(20, 9, 20)

In [98]:
res = t(torch.tensor([[1, 2, 3], [4, 5, 6]]))

In [99]:
res.shape

torch.Size([2, 1, 20])

In [81]:
block = DecoderBlock(9, 3)

In [302]:
res

tensor([[[ 2.6996, -0.0540, -0.6785,  1.8729,  2.5463,  2.1610,  0.5126,
           2.8026, -1.9383],
         [ 1.6356, -0.7436, -0.4365, -0.7483,  1.9983,  2.3589, -1.0551,
           3.3037, -1.2795],
         [ 2.3710,  0.7427, -0.6359,  2.6279, -0.0899,  3.2949, -0.8824,
           0.0907, -1.6748]],

        [[ 2.3579,  0.6638, -1.4764,  2.3927,  0.9059,  0.3122, -0.6362,
           2.6056, -2.0278],
         [ 1.3375,  0.1816, -2.2884, -0.0256,  0.0244,  0.3844, -0.8661,
           2.7096, -1.0883],
         [ 1.4010, -2.4964, -0.7134,  1.8807,  0.6373,  1.1526, -0.4668,
           3.5641, -2.9647]]], grad_fn=<AddBackward0>)

In [303]:
dec(torch.tensor([[1, 2, 3], [4, 5, 6]]), res)

tensor([[[0.0523, 0.0027, 0.0343, 0.0366, 0.0273, 0.0695, 0.0621, 0.0055,
          0.0454, 0.6644],
         [0.0165, 0.0185, 0.0279, 0.0604, 0.0092, 0.1517, 0.1187, 0.0142,
          0.0573, 0.5257],
         [0.1876, 0.0133, 0.1345, 0.0163, 0.2110, 0.0795, 0.0503, 0.1099,
          0.0325, 0.1651]],

        [[0.2006, 0.0007, 0.0296, 0.0614, 0.0403, 0.0224, 0.1603, 0.0042,
          0.1473, 0.3330],
         [0.2278, 0.0144, 0.1093, 0.1420, 0.0250, 0.0488, 0.0267, 0.0125,
          0.3145, 0.0789],
         [0.0460, 0.0108, 0.0337, 0.0297, 0.0253, 0.0274, 0.1948, 0.0232,
          0.0782, 0.5309]]], grad_fn=<SoftmaxBackward0>)

In [304]:
Decoder(

SyntaxError: incomplete input (2730920761.py, line 1)