In [3]:
import numpy as np
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from collections import Counter
import re
import math
from torch import nn
from torch.nn import functional as F

In [58]:
data = pd.read_csv("data.txt", delimiter=",").to_numpy()

In [59]:
X, y = data[:, 0], data[:, 1]

In [60]:
def tokenize(data):
    d = {}
    pattern = r"[^\w\s]|\.{1,}"
    word_list = list(
        set(
            [
                re.sub(pattern, "", word)
                for sublist in data
                for word in sublist.split(" ")
            ]
        )
    )
    for i, word in enumerate(word_list):
        d[word] = i
    return d

In [61]:
lookup_dictionary = tokenize(X)


In [62]:
class PositionalEmbedding:

    def __init__(self, max_length, embedding_dim, lookup_dictionary):
        self.embedding_matrix = np.zeros((max_length, embedding_dim))
        self.max_length = max_length
        self.embedding_dim = embedding_dim
        self.lookup_dictionary = lookup_dictionary
        print(self.lookup_dictionary)

    def generate(self, sentence):
        indices = None
        try:
            indices = [self.lookup_dictionary[word] for word in sentence]
        except:
            raise Exception("new word was identified")

        sentence_pos_embeddings = []
        for pos in range(len(sentence)):
            word_pos_embedding = np.zeros(self.embedding_dim)
            for dim in range(self.embedding_dim):
                if dim % 2:
                    word_pos_embedding[dim] = self.odd_PE(pos, dim)
                else:
                    word_pos_embedding[dim] = self.even_PE(pos, dim)
            sentence_pos_embeddings.append(word_pos_embedding.copy())
        return sentence_pos_embeddings, indices

    def even_PE(self, pos, i):
        return math.cos(pos / (10000 ** ((2 * i) / self.embedding_dim)))

    def odd_PE(self, pos, i):
        return math.sin(pos / (10000 ** ((2 * i) / self.embedding_dim)))

In [63]:
sentence = "its easy to carry".split()
embedding = PositionalEmbedding(lookup_dictionary= lookup_dictionary , max_length=100, embedding_dim=10)
pos_embeddings, indices = embedding.generate(sentence)



In [216]:
class MultiHeadAttention(nn.Module):

    def __init__(self, input_dim, output_dim, num_heads, indices, masking = False):
        ## Input_dim is basically the dimension of the positional word embeddiing
        super(MultiHeadAttention, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.num_heads = num_heads
        self.key_linear = nn.Linear(input_dim, output_dim)
        self.value_linear = nn.Linear(input_dim, output_dim)
        self.query_linear = nn.Linear(input_dim, output_dim)
        self.last_linear = nn.Linear(input_dim, output_dim)
        self.indices = torch.tensor(indices)
        self.depth = self.output_dim // self.num_heads
        self.masking = masking

    def split_heads(self, x, batch_size):

        reshaped_x = x.view(batch_size, -1, self.num_heads, self.depth)
        return reshaped_x.transpose(1, 2)

    def forward(self, Q, K, V):
        batch_size = 1

        K_out = self.key_linear(K)
        Q_out = self.query_linear(Q)
        V_out = self.value_linear(V)

        K_splitted = self.split_heads(K_out, batch_size)
        Q_splitted = self.split_heads(Q_out, batch_size)
        V_splitted = self.split_heads(V_out, batch_size)

        attention_weight = torch.matmul(
            Q_splitted, K_splitted.transpose(-2, -1)
        ) / torch.sqrt(torch.tensor(self.depth, dtype=torch.float32))

        # Here the attention_weight has the shape (batch size, number of heads, number of words in the sentence, number of heads, number of words in the sentence)

        ## up until now, we have calculated the attention scores. Now, we should apply the masking if True

        if self.masking:
            trinagular_matrix = torch.tril(
                torch.ones([attention_weight.shape[2], attention_weight.shape[2]])
            )
            trinagular_matrix = trinagular_matrix.unsqueeze(0).unsqueeze(0)
            trinagular_matrix = trinagular_matrix.expand(
                batch_size,
                self.num_heads,
                attention_weight.shape[2],
                attention_weight.shape[2],
            )
            attention_weight = attention_weight.masked_fill(trinagular_matrix == 0, float('-inf'))

        normalized_attention_weight = F.softmax(attention_weight, -1)
        
        output = torch.matmul(normalized_attention_weight, V_splitted)

        output = output.transpose(1, 2).contiguous()
        output = output.view(batch_size, -1, self.output_dim)

        output = self.last_linear(output)

        return output

In [211]:
# Example usage
x = torch.tensor(pos_embeddings)

seq_len = x.shape[0]
input_dim = x.shape[1]
output_dim = x.shape[1]
num_heads = 2


attention = MultiHeadAttention(input_dim, output_dim, num_heads, indices, masking= True)
output = attention(x, x, x)
# print(output)
# print(output.shape)

tensor([[[[1.0000, 0.0000, 0.0000, 0.0000],
          [0.4993, 0.5007, 0.0000, 0.0000],
          [0.3410, 0.3344, 0.3246, 0.0000],
          [0.2656, 0.2574, 0.2429, 0.2341]],

         [[1.0000, 0.0000, 0.0000, 0.0000],
          [0.4866, 0.5134, 0.0000, 0.0000],
          [0.3276, 0.3309, 0.3415, 0.0000],
          [0.2570, 0.2529, 0.2471, 0.2429]]]], grad_fn=<SoftmaxBackward0>)
tensor([[[[-4.5705e-02,  1.9059e-01, -1.9999e-01,  1.2057e-01, -2.6404e-01],
          [ 7.6685e-04,  1.2746e-01, -1.6928e-01,  9.2891e-02, -2.0819e-01],
          [ 6.6716e-02,  1.7018e-02, -1.0096e-01,  6.0378e-02, -1.2653e-01],
          [ 1.2254e-01, -7.2927e-02, -4.7223e-02,  3.1676e-02, -5.7822e-02]],

         [[ 5.5688e-01, -1.3704e-01, -4.2619e-01,  1.0696e+00, -2.3870e-01],
          [ 4.9962e-01, -1.6114e-01, -3.8849e-01,  1.0126e+00, -2.0362e-01],
          [ 3.9732e-01, -2.1184e-01, -3.0959e-01,  9.3333e-01, -1.2787e-01],
          [ 3.1769e-01, -2.5027e-01, -2.4972e-01,  8.6859e-01, -7.0673e-02

In [190]:
class EncoderLayer(nn.Module):
    def __init__(self, input_dim, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.input_dim = input_dim
        self.output_dim = input_dim
        self.linear1 = nn.Linear(input_dim, self.output_dim)
        self.linear2 = nn.Linear(self.output_dim, self.output_dim)
        self.linear3 = nn.Linear(self.output_dim, self.output_dim)
        self.layer_norm1 = nn.LayerNorm(self.input_dim)
        self.layer_norm2 = nn.LayerNorm(self.output_dim)
        self.layer_norm3 = nn.LayerNorm(self.output_dim)

    def forward(self, multihead_att_out):
        # If the multihead_att_out has the dimension (batch_size, number of words in the sentence, number of dimensions) then the layernorm should
        #    be done over the number of dimensions

        # As you can see, the encoder gets the output of the attention layer and passes through a set 
        #of linear tranformations with residual layers and layernorm
        x_res_1 = F.relu(self.linear1(multihead_att_out)) + multihead_att_out
        x_out_first_layer = self.layer_norm1(x_res_1)

        x_res_2 = F.relu(self.linear2(x_out_first_layer)) + x_out_first_layer
        x_out_second_layer = self.layer_norm2(x_res_2)

        x_res_3 = F.relu(self.linear3(x_out_second_layer)) + x_out_second_layer
        x_out_third_layer = self.layer_norm3(x_res_3)

        return x_out_third_layer

In [12]:
class EncoderTransformer(nn.Module):
    def __init__(self, num_encoders, input_dim, output_dim, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        encoder_list = [EncoderLayer(input_dim) for i in range(num_encoders)]
        self.stacked_encoders = nn.ModuleList([encoder for encoder in encoder_list])

    def forward(self, x):
        for encoder in self.stacked_encoders:
            x = encoder(x)
        return x

In [17]:
encoder_transformer = EncoderTransformer(num_encoders= 3, input_dim= 10)

In [18]:
encoder_out = encoder_transformer(output)

In [212]:
class DecoderLayer(nn.Module):
    def __init__(self, input_dim, output_dim, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.linear1 = nn.Linear(input_dim, output_dim)
        self.linear2 = nn.Linear(input_dim, output_dim)
        self.linear3 = nn.Linear(input_dim, output_dim)
        self.encoder_decoder_attention = MultiHeadAttention(
            input_dim = input_dim, output_dim = output_dim, masking = False
        )

        self.self_decoder_attention = MultiHeadAttention(
            input_dim=input_dim, output_dim=output_dim, masking = True
        )

    def forward(self, encoders_multihead_att_out, target_sequence_position_embedding):
        ## Here we should implement the residual nets but before we need to create self and encoder-decoder attentions. Here is the idea:
        """
                In the decoder, first we need to create a self attention for the "oi tudo bem" to have a context rich embedding for this sentence,
                also to have an attention map for the translated sentence.
                Then, we use this self attention output to generate another attention.
                This time, we want to generate the attention map of both "oi tudo bem" and "how are you",
                saying: knowing who I am in the translated sentence, to whom in the otiginal sentence should I give attention.
                The output of this second multihead attention is a context rich embedding in both languages.
                Here is a summary:
                
                Self-Attention for Target Sequence:
                Generate a context-rich embedding for the translated sentence ("oi tudo bem").
                Create an attention map for the translated sentence itself.
                
                Encoder-Decoder Attention:
                Use the self-attention output (context-rich embedding of "oi tudo bem") as queries.
                Use the encoder's output (context-rich embedding of "hey how are you") as keys and values.
                Generate an attention map and context-rich embedding that combines both sentences.
        """

        decoders_self_attention_out = self.self_decoder_attention(
            Q=target_sequence_position_embedding,
            K=target_sequence_position_embedding,
            V=target_sequence_position_embedding,
        )

        ## Resicual layer:
        layer_normed_self_att = nn.LayerNorm(
            decoders_self_attention_out + decoders_self_attention_out
        )

        encoder_decoder_attention = self.encoder_decoder_attention(
            Q=layer_normed_self_att,
            K=encoders_multihead_att_out,
            V=encoders_multihead_att_out,
        )

        layer_normed_enc_dec_att = nn.LayerNorm(encoder_decoder_attention + layer_normed_self_att)

        x_res1 = nn.LayerNorm(nn.ReLU(self.linear1(x)) + layer_normed_enc_dec_att)

        x_res2 = nn.LayerNorm(nn.ReLU(self.linear1(x_res1)) + x_res1)

        x_res3 = nn.LayerNorm(nn.ReLU(self.linear1(x_res2)) + x_res2)

        return x_res3

In [213]:
class DecoderTransform(nn.Module):
    def __init__(self, num_decoders, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.decoder_list = [DecoderLayer() for i in range(num_decoders)]
        self.stacked_decoders = nn.ModuleList([decoder for decoder in self.decoder_list])
    def forward(self, x):
        for decoder in self.self.stacked_decoders:
            x = decoder(x)
        return x