In [1]:
import numpy as np
import nltk


In [12]:
from nltk.tokenize import (word_tokenize)
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders, processors

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [203]:
class Encoder(nn.Module):
    def __init__(self,vocab_size,d_model=10,num_heads=5):
        super(Encoder,self).__init__()
        self.posi_enc = PositionalEncoding(10,vocab_size)
        self.vocab_size = vocab_size
        self.mha = MultiHeadAtten(d_model=d_model,num_heads=5,vocab_size=vocab_size)
        
        self.layer_norm = nn.LayerNorm(d_model)
        self.ffn = FeedForwardNetwork(input_size=10,hidden_size=5,output_size=10)
    def forward(self,embed):
        # print(self.vocab_size,'self.vocab_size')
        embeds_ = self.posi_enc(embed.unsqueeze(1))
        mha = self.mha(embeds_.squeeze(1))
        embed_layer =self.layer_norm(embeds_.squeeze(1) + mha)
        out = self.ffn(embed_layer)
        out = out + embed_layer
        out = self.layer_norm(out)
        return out

class FeedForwardNetwork(nn.Module):
    def __init__(self,input_size=10,hidden_size=5,output_size=10):
        super(FeedForwardNetwork,self).__init__()
        self.layer1 = nn.Linear(input_size,hidden_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size,output_size)
    def forward(self,x):
        out = self.layer1(x)
        out=self.relu(out)
        out = self.layer2(out)
        return out
        

In [200]:
class ResidualBlock(nn.Module):
    def __init__(self, features:int,dropout:float)->None:
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LinearNorm(features)
    def forward(self,x,sublayer):
        return x + self.dropout(sublayer(self.norm(x)))
        

class EncoderBlock(nn.Module):
    def __init__(self, self_attention:MultiHeadAtten,self_ffn: FeedForwardNetwork)->None:
        super().__init__()
        self.self_attention = self_attention
        self.self_ffn = self_ffn
        self.residual_blocks = nn.ModuleList([ResidualBlock(features,dropout) for _ in range(2)])
        
    def forward(self,embed):
        out = self.residual_blocks[0](embed, lambda embed: self.self_attention(embed))
        out = self.residual_blocks[1](embed,self.FeedForwardNetwork)
        return out

        
# class Encoder(nn.Module):
#     def __init__(self, features:int, layers:nn.ModuleList)->None:
#         super().__init__()
#     def forward(self,x):
#         for layer in self.layers:
#             x = layer(x, mask)
#         return x

class FeedForwardNetwork(nn.Module):
    def __init__(self,d_model:int,hidden_size:int,dropout:float)-> None:
        super().__init__()
        self.layer1 = nn.Linear(d_model,hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size,d_model)
    def forward(self,x):
        out = self.layer1(x)
        out=self.relu(out)
        out = self.layer2(out)
        return out

class Encoder(nn.Module):
    def __init__(self, features:int, self_encoder_block:EncoderBlock)->None:
        self.self_encoder_block = self_encoder_block
    def forward(self,embeds):
        embeds = self.self_encoder_block(embeds)
        return embeds
        

In [208]:
class MultiHeadAtten(nn.Module):
    def __init__(self,d_model: int,num_heads:int,vocab_size:int,dropout):
        super(MultiHeadAtten,self).__init__()
        self.vocab_size = vocab_size
        self.num_heads = num_heads
        self.d_model = d_model
        self.m = nn.Softmax(dim=-1)
        
    def initialize_weights(self,input_size,output_size):
    # Initialize weights with random values from a normal distribution
        weights = torch.rand(input_size, output_size)
        return weights
    
    def forward(self,x, encoder_out=None, mask=None):
        if encoder_out == None:
            Q = K = V = x
        else:
            Q = x
            K = V = encoder_out
            
            pad_size = K.size(0) - Q.size(0)

            Q = F.pad(Q, (0, 0, 0, pad_size), "constant", 0)

        d_k = self.d_model / self.num_heads
        query_weights = self.initialize_weights(self.d_model,self.d_model)

        # Initialize weights for key matrix
        key_weights = self.initialize_weights(self.d_model,self.d_model)
        
        # Initialize weights for value matrix
        value_weights = self.initialize_weights(self.d_model,self.d_model)
        Q_,K_,V_ = torch.matmul(Q,query_weights),torch.matmul(K,key_weights),torch.matmul(V,value_weights)
        Q_split = Q_.view(self.num_heads,10 , 10 // self.num_heads)
        K_split = K_.view(self.num_heads,10 , 10 // self.num_heads)
        V_split = V_.view(self.num_heads,10 , 10 // self.num_heads)
        heads = []
        for i in range(len(Q_split)):
            scaled_dot_product = torch.matmul(Q_split[i],K_split[i].T)/np.sqrt(d_k)
            if mask is not None:
                print(scaled_dot_product.shape,mask.shape,'shape')
                scaled_dot_product = scaled_dot_product.masked_fill(mask==0,float('-inf'))
            attn_weights = self.m(scaled_dot_product)
            heads.append(torch.matmul(attn_weights,V_split[i]))
        heads = torch.stack(heads)
        Head = heads.view(self.num_heads * 2, 10)
        # print(Head.shape,'head')
        
        Head_weights = self.initialize_weights(int(self.num_heads * d_k),self.d_model)
        multi_head = torch.matmul(Head,Head_weights)
        return multi_head
model = Model_mha(d_model=10, num_heads=5,input_size=10,hidden_size=5,output_size=10)
result = model(text_embedding,output_embedding,mask)

TypeError: MultiHeadAtten.__init__() missing 1 required positional argument: 'dropout'

In [205]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, vocab_size=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(vocab_size, d_model)
        position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]


        

        
class FeedForwardNetwork(nn.Module):
    def __init__(self,input_size=10,hidden_size=5,output_size=10):
        super(FeedForwardNetwork,self).__init__()
        self.layer1 = nn.Linear(input_size,hidden_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_size,output_size)
    def forward(self,x):
        out = self.layer1(x)
        out=self.relu(out)
        out = self.layer2(out)
        return out


class Decoder(nn.Module):
    def __init__(self,d_model, vocab_size, num_heads=5,input_size=10,hidden_size=5,output_size=10):
        super(Decoder,self).__init__()
        self.vocab_size = vocab_size
        self.posi_enc = PositionalEncoding(d_model, vocab_size)
        self.masked_mha = MultiHeadAtten(d_model,num_heads,vocab_size=vocab_size)
        self.mha = MultiHeadAtten(d_model,num_heads)
        self.layer_norm = nn.LayerNorm(d_model)
        self.ffn = FeedForwardNetwork(input_size,hidden_size,output_size)

    
    def forward(self,embeds,encoder_out=None,mask=None):
        out_pos = self.posi_enc(embeds.unsqueeze(1))
        out = self.masked_mha(out_pos.squeeze(1),mask=mask)
        out = out + out_pos.squeeze(1)
        out_layer1 = self.layer_norm(out)
        out = self.mha(out_layer1,encoder_out=encoder_out)
        out = out + out_layer1
        out_layer2 = self.layer_norm(out)
        out = self.ffn(out_layer2)
        out = out + out_layer2
        return out
        




In [190]:
class Model_mha(nn.Module):
    def __init__(self,d_model=10, num_heads=5,input_size=10,hidden_size=5,output_size=10,vocab_size=6):
        super(Model_mha,self).__init__()
        self.encoder = Encoder(d_model=10,num_heads=5,vocab_size=len(input_tokenizer.get_vocab()))
        self.decoder = Decoder(d_model, vocab_size=len(output_tokenizer.get_vocab()), num_heads=5,input_size=10,hidden_size=5,output_size=10)
        self.linear = nn.Linear(input_size,output_size)
        self.soft = nn.Softmax(dim=-1)
    
    def forward(self,input_embed,output_embed,mask):
        encoder_out = self.encoder(input_embed)
        decoder_out = self.decoder(output_embed,encoder_out,mask)
        return encoder_out



In [211]:
class MultiHeadAtten(nn.Module):
    def __init__(self,d_model = 10,num_heads=5,vocab_size=6):
        super(MultiHeadAtten,self).__init__()
        self.vocab_size = vocab_size
        self.num_heads = num_heads
        self.d_model = d_model
        self.m = nn.Softmax(dim=-1)
        
    def initialize_weights(self,input_size,output_size):
    # Initialize weights with random values from a normal distribution
        weights = torch.rand(input_size, output_size)
        return weights
    
    def forward(self,x, encoder_out=None, mask=None):
        if encoder_out == None:
            Q = K = V = x
        else:
            Q = x
            K = V = encoder_out
            
            pad_size = K.size(0) - Q.size(0)

            Q = F.pad(Q, (0, 0, 0, pad_size), "constant", 0)

        d_k = self.d_model / self.num_heads
        query_weights = self.initialize_weights(self.d_model,self.d_model)

        # Initialize weights for key matrix
        key_weights = self.initialize_weights(self.d_model,self.d_model)
        
        # Initialize weights for value matrix
        value_weights = self.initialize_weights(self.d_model,self.d_model)
        Q_,K_,V_ = torch.matmul(Q,query_weights),torch.matmul(K,key_weights),torch.matmul(V,value_weights)
        Q_split = Q_.view(self.num_heads,10 , 10 // self.num_heads)
        K_split = K_.view(self.num_heads,10 , 10 // self.num_heads)
        V_split = V_.view(self.num_heads,10 , 10 // self.num_heads)
        heads = []
        for i in range(len(Q_split)):
            scaled_dot_product = torch.matmul(Q_split[i],K_split[i].T)/np.sqrt(d_k)
            if mask is not None:
                print(scaled_dot_product.shape,mask.shape,'shape')
                scaled_dot_product = scaled_dot_product.masked_fill(mask==0,float('-inf'))
            attn_weights = self.m(scaled_dot_product)
            heads.append(torch.matmul(attn_weights,V_split[i]))
        heads = torch.stack(heads)
        Head = heads.view(self.num_heads * 2, 10)
        # print(Head.shape,'head')
        
        Head_weights = self.initialize_weights(int(self.num_heads * d_k),self.d_model)
        multi_head = torch.matmul(Head,Head_weights)
        return multi_head
model = Model_mha(d_model=10, num_heads=5,input_size=10,hidden_size=5,output_size=10)
result = model(text_embedding,output_embedding,mask)

torch.Size([10, 1, 10]) xs 10
tensor([[[ 1.8733, -0.7064, -0.7422, -0.0855, -1.4620,  0.8337, -0.6678,
          -0.3217,  0.2287,  0.9445]],

        [[ 0.0674,  0.9699,  1.0701,  0.2637,  0.6719,  0.6149,  1.4483,
           1.6997, -0.1566, -0.5222]],

        [[ 0.8231, -0.5778,  0.0616, -0.6926,  1.0001,  1.8811, -1.1803,
           3.1271, -0.9141,  1.9629]],

        [[-0.8954, -1.1705, -0.2625,  0.6475,  0.5567,  1.2419,  0.5073,
          -0.5007,  0.6811,  0.4418]],

        [[ 1.1115,  0.3736,  0.7952, -0.1086,  1.0646,  1.8434,  0.6801,
           0.4552,  0.9872, -1.0676]],

        [[-0.1827,  0.4770,  0.1068,  0.7795,  0.7511,  0.6717, -1.1287,
           0.4299, -1.4088,  3.0072]],

        [[-0.0433,  2.8450,  0.2376,  0.7353, -0.0384,  1.1251, -0.8960,
           1.8136,  1.4383,  0.0657]],

        [[ 0.8670, -0.2937,  1.2638,  0.4001, -0.4729,  1.2050,  0.0885,
          -1.5180, -0.0688,  1.6233]],

        [[ 1.9604,  0.5907,  0.8830, -1.6707,  0.7589,  2.3515, -0

In [210]:
result

tensor([[ 7.8785e-01, -1.5043e-01, -1.8734e+00, -1.0219e+00, -7.4881e-01,
          6.2113e-01, -3.0642e-02, -3.1917e-01,  1.4463e+00,  1.2891e+00],
        [ 1.0103e+00, -2.9173e-01, -1.4029e+00, -1.9209e+00,  5.1360e-01,
         -1.9873e-01,  2.1622e-01, -2.2741e-01,  1.4556e+00,  8.4602e-01],
        [ 8.8965e-01, -5.8871e-01, -1.4915e+00, -1.6298e+00,  2.9908e-01,
         -1.5210e-01, -9.9447e-02, -3.3562e-04,  1.3052e+00,  1.4680e+00],
        [ 7.1591e-01, -8.0708e-01, -1.4545e+00, -1.1414e+00,  1.7800e-01,
         -1.5304e-01,  2.4323e-01, -6.5507e-01,  1.8105e+00,  1.2634e+00],
        [ 1.1063e+00, -4.2132e-01, -1.2921e+00, -1.8655e+00,  5.5175e-01,
          2.8650e-01, -1.4560e-02, -4.4940e-01,  1.6230e+00,  4.7534e-01],
        [ 7.1090e-01, -2.9867e-01, -1.3932e+00, -1.4643e+00,  5.2168e-01,
         -3.0637e-01, -1.9400e-01, -5.2043e-01,  1.0779e+00,  1.8665e+00],
        [ 8.4408e-01, -2.6251e-02, -1.4827e+00, -1.7666e+00,  4.6324e-01,
         -1.5933e-01, -1.8276e-0

In [170]:
#input embeddings
text = ["I love you so much.",
    "This is an example sentence.",
    "Another sentence goes here.",
    "More text data for training.",
    "Natural Language Processing is fascinating.",
    "Machine learning provides systems the ability to learn.",
    "Deep learning is a subset of machine learning."
    "Transformers are powerful models for sequence tasks.",
    "The quick brown fox jumps over the lazy dog."]
# Tokenizer setup (optional but recommended for better tokenization)
tokenizer = Tokenizer(models.WordLevel(unk_token="[UNK]"))
trainer = trainers.WordLevelTrainer(special_tokens=["[UNK]", "[PAD]", "[SOS]", "[EOS]"], min_frequency=1)
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

# Train the tokenizer on the corpus
tokenizer.train_from_iterator(text, trainer)
# Setup decoder and post-processor
tokenizer.decoder = decoders.WordPiece()
tokenizer.post_processor = processors.TemplateProcessing(
    single="[SOS] $A [EOS]",
    pair="[SOS] $A [EOS] [SOS] $B:1 [EOS]:1",
    special_tokens=[
        ("[SOS]", tokenizer.token_to_id("[SOS]")),
        ("[EOS]", tokenizer.token_to_id("[EOS]"))
    ],
)

# Save the tokenizer to disk
tokenizer.save("wordlevel_tokenizer1.json")
# Print the tokenizer's vocabulary
vocab = tokenizer.get_vocab()
print("Vocabulary:", vocab)
print("Vocabulary Size:", len(vocab))

# Load the tokenizer and use it
input_tokenizer = Tokenizer.from_file("wordlevel_tokenizer1.json")

# Encode some text with padding
encoded = input_tokenizer.encode("I love you so much.")
# print("Tokens:", encoded.tokens)

# Assuming you want to pad the sequence to a fixed length (e.g., 10)
max_length = 10
padding_token = "[PAD]"
padding_token_id = input_tokenizer.token_to_id(padding_token)

# Add padding
padded_tokens = encoded.tokens + [padding_token] * (max_length - len(encoded.tokens))
# print("Padded Tokens:", padded_tokens)

# Convert padded tokens back to IDs
padded_ids = [input_tokenizer.token_to_id(token) for token in padded_tokens]
# print("Padded Token IDs:", padded_ids)

embeddings = nn.Embedding(num_embeddings=len(input_tokenizer.get_vocab()),embedding_dim=embedding_dim)
indices_tensor = torch.tensor(padded_ids)
text_embedding = embeddings(indices_tensor) 

text_embedding.shape

Vocabulary: {'This': 19, 'Natural': 16, 'a': 21, 'learning': 6, 'sequence': 45, 'text': 50, 'data': 26, 'Another': 10, 'dog': 27, '[UNK]': 0, 'lazy': 34, 'learn': 35, 'over': 41, 'training': 52, 'sentence': 8, 'Language': 13, 'models': 38, 'powerful': 42, 'you': 53, '[PAD]': 1, 'here': 32, '[EOS]': 3, 'machine': 37, 'example': 28, 'I': 12, 'to': 51, 'systems': 48, 'for': 7, 'goes': 31, 'Deep': 11, 'the': 9, 'ability': 22, 'quick': 44, 'fascinating': 29, 'love': 36, 'jumps': 33, 'provides': 43, 'Processing': 17, '.': 4, 'an': 23, 'much': 39, 'of': 40, '[SOS]': 2, 'More': 15, 'so': 46, 'Machine': 14, 'subset': 47, 'tasks': 49, 'brown': 25, 'are': 24, 'Transformers': 20, 'fox': 30, 'The': 18, 'is': 5}
Vocabulary Size: 54


torch.Size([10, 10])

In [184]:
output = ["Ti amp molto",
         "Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
    "Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium.",
]
tokenizer = Tokenizer(models.WordLevel(unk_token="[UNK]"))
trainer = trainers.WordLevelTrainer(special_tokens=["[UNK]", "[PAD]", "[SOS]", "[EOS]"], min_frequency=1)
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

tokenizer.train_from_iterator(output, trainer)
# Setup decoder and post-processor
tokenizer.decoder = decoders.WordPiece()
tokenizer.post_processor = processors.TemplateProcessing(
    single="[SOS] $A [EOS]",
    pair="[SOS] $A [EOS] [SOS] $B:1 [EOS]:1",
    special_tokens=[
        ("[SOS]", tokenizer.token_to_id("[SOS]")),
        ("[EOS]", tokenizer.token_to_id("[EOS]"))
    ],
)
# Save the tokenizer to disk
tokenizer.save("wordlevel_tokenizer2.json")

# Load the tokenizer and use it
output_tokenizer = Tokenizer.from_file("wordlevel_tokenizer2.json")

# Print the tokenizer's vocabulary
vocab = output_tokenizer.get_vocab()

encoded = output_tokenizer.encode("Ti amp molto.")

max_length = 10
padding_token = "[PAD]"
padding_token_id = output_tokenizer.token_to_id(padding_token)

# Add padding
padded_tokens = encoded.tokens + [padding_token] * (max_length - len(encoded.tokens))
# print("Padded Tokens:", padded_tokens)

# Convert padded tokens back to IDs
padded_ids = [output_tokenizer.token_to_id(token) for token in padded_tokens]
# print("Padded Token IDs:", padded_ids)

output_embeddings = nn.Embedding(num_embeddings=len(output_tokenizer.get_vocab()),embedding_dim=embedding_dim)
indices_tensor = torch.tensor(padded_ids)
output_embedding = output_embeddings(indices_tensor)

mask = torch.eye(10, dtype=torch.bool) | ~(torch.triu(torch.ones(10, 10)) == 1)
