<a href="https://colab.research.google.com/github/Anvians/Deep_Learning/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [272]:
import torch
import torch.nn as nn
import math


In [349]:
from torch.utils.data import Dataset, DataLoader
from torchtext.data.utils import get_tokenizer

path = '/content/sherlock.txt'
tokenizer = get_tokenizer('basic_english')

# Load the corpus from the file
with open(path, 'r') as f:
    result = f.readlines()
    result = re.split(r'([,.:;?-_!\'()\]\s])', lines.lower())
    result = [item for item in result if item.strip()]
class TestDataset(Dataset):
    def __init__(self, corpus):
        self.corpus = corpus
        self.lines = self.corpus

    def __len__(self):
        return len(self.lines)

    def __getitem__(self, index):
        token = tokenizer(self.lines[index].strip())
        return token

dataset = TestDataset(result)

In [348]:
print(dataset.corpus)



In [350]:
from torchtext.data.utils import get_tokenizer

# Tokenize the data
tokenized_data = [tokenizer(line.strip()) for line in result]  # Keep each sentence's tokens in order

# Create a vocabulary while preserving sentence order
# Use a list instead of a set to preserve word order
vocab = {}
index = 0
for sentence in tokenized_data:
    for token in sentence:
        if token not in vocab:
            vocab[token] = index
            index += 1

print(vocab)




In [351]:
# sequence_length = 5  # Number of words in the input sequence
input_output_pairs = []

# Generate input-output pairs
for i in range(len(tokenized_data) - sequence_length):
    input_sequence = tokenized_data[i:i + sequence_length]  # Input sequence of length 5
    target_word = tokenized_data[i + sequence_length]       # Next word
    input_output_pairs.append((input_sequence, target_word))


# Print the input-output pairs
print(input_output_pairs)




In [354]:
PAD_IDX = len(vocab)  # Assuming PAD token index is at the end of vocab
max_sequence_length = 10  # Example padding length

# Padding function
def pad_sequence(sequence, max_length):
    return sequence + [PAD_IDX] * (max_length - len(sequence))

# Apply padding to input-output pairs
# Change: Extract the first token from target_word list for vocab lookup
# Change: Check if target_word is empty before accessing its first element
padded_input_output_pairs = [
    (pad_sequence(input_seq, max_sequence_length), vocab.get(target_word[0], PAD_IDX) if target_word else PAD_IDX) # Get the first token from target_word if not empty
    for input_seq, target_word in input_output_pairs
]

In [355]:
class WordPredictionDataset(Dataset):
    def __init__(self, data, vocab):
        self.data = data
        self.vocab = vocab  # Store vocab
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_seq, target = self.data[idx]
        # Flatten input_seq and then convert tokens to indices
        input_seq_indices = [
            self.vocab.get(token, PAD_IDX)
            for sublist in input_seq
            for token in (sublist if isinstance(sublist, (list, tuple)) else [sublist])  # Wrap in a list if not iterable
        ]
        # Pad the input_seq_indices to max_sequence_length
        input_seq_indices = input_seq_indices[:max_sequence_length]  # Truncate if longer
        input_seq_indices = input_seq_indices + [PAD_IDX] * (max_sequence_length - len(input_seq_indices))  # Pad if shorter

        return torch.tensor(input_seq_indices, dtype=torch.long), torch.tensor(target, dtype=torch.long)

# Create dataset and DataLoader
dataset = WordPredictionDataset(padded_input_output_pairs, vocab)  # Pass vocab to dataset
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [356]:
for batch_idx, (input_seq, target) in enumerate(dataloader):
    print(f"Batch {batch_idx + 1}")
    print(f"Input Sequence: {input_seq}")
    print(f"Target Word: {target}")
    break

Batch 1
Input Sequence: tensor([[  30,   76,  138, 4092,    7, 9240, 9240, 9240, 9240, 9240],
        [ 138,   77, 1127, 4690,  438, 9240, 9240, 9240, 9240, 9240],
        [  30,  447,  519, 1594,  358, 9240, 9240, 9240, 9240, 9240],
        [  30,  612,  657,  979,  358, 9240, 9240, 9240, 9240, 9240],
        [3126,    7,    5,  504,   79, 9240, 9240, 9240, 9240, 9240],
        [   5, 4158,    7,    5,  958, 9240, 9240, 9240, 9240, 9240],
        [ 405, 1389,   10,   25,  682, 9240, 9240, 9240, 9240, 9240],
        [ 404,  140,   10,  556,    5, 9240, 9240, 9240, 9240, 9240],
        [   7,    5,   82, 2955,    7, 9240, 9240, 9240, 9240, 9240],
        [1643,   30, 8764,   10,  447, 9240, 9240, 9240, 9240, 9240],
        [  31,   30,  447,   76,  580, 9240, 9240, 9240, 9240, 9240],
        [8412,  211,    5, 3818,   10, 9240, 9240, 9240, 9240, 9240],
        [  10, 5917,   30,  447, 2032, 9240, 9240, 9240, 9240, 9240],
        [8787,  140,  227,  138,   77, 9240, 9240, 9240, 9240, 924

#Transformer Encoder

##Positional Encoding

In [330]:
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.embed_size = embed_size

        # Initialize the positional encoding matrix
        PE = torch.zeros(max_len, embed_size)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # Shape: (max_len, 1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * -(math.log(10000.0) / embed_size))  # Shape: (embed_size/2,)

        PE[:, 0::2] = torch.sin(pos * div_term)  # Apply sine to even indices (0, 2, 4, ...)
        PE[:, 1::2] = torch.cos(pos * div_term)  # Apply cosine to odd indices (1, 3, 5, ...)

        # Add a new dimension for batch_size
        self.PE = PE.unsqueeze(0)  # Shape: (1, max_len, embed_size)

    def forward(self, x):
        # Add positional encoding to the input tensor
        x = x + self.PE[:, :x.size(1), :]
        return x



##Multi Head Attention

In [331]:

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads #Ensure that how much dim of input embedding each head gets

        # Ensure the embed_size is divisible by the number of heads
        assert self.head_dim * heads == embed_size, 'Embedding size must be divisible by the number of heads'

        # Linear layers for query, key, and value
        self.query_matrix = nn.Linear(embed_size, embed_size)
        self.key_matrix = nn.Linear(embed_size, embed_size)
        self.value_matrix = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, x):
        batch_size, seq_len, embed_size = x.size()
        # batch_size is the number of sequences in the batch.
        # seq_len is the number of tokens (or words) in each sequence.
        # embed_size is the dimensionality of the embeddings (i.e., the size of the word embeddings).

        # Apply the linear transformations to get query, key, and value tensors
        query = self.query_matrix(x)  # Shape: (batch_size, seq_len, embed_size)
        key = self.key_matrix(x)      # Shape: (batch_size, seq_len, embed_size)
        value = self.value_matrix(x)  # Shape: (batch_size, seq_len, embed_size)

        # Reshape and transpose to split into multiple heads
        query = query.view(batch_size, seq_len, self.heads, self.head_dim).transpose(1, 2)  # Shape: (batch_size, heads, seq_len, head_dim)

        key = key.view(batch_size, seq_len, self.heads, self.head_dim).transpose(1, 2)      # Shape: (batch_size, heads, seq_len, head_dim)
        value = value.view(batch_size, seq_len, self.heads, self.head_dim).transpose(1, 2)  # Shape: (batch_size, heads, seq_len, head_dim)

        # Calculate the attention scores (dot-product of query and key)
        attention = torch.matmul(query, key.transpose(-1, -2))  # Shape: (batch_size, heads, seq_len, seq_len)

        # Scale the attention scores by the square root of the head dimension (for stability)
        attention = attention / (self.head_dim ** 0.5)

        # Apply softmax to get the attention weights
        attention = torch.softmax(attention, dim=-1)

        # Multiply attention weights by the value matrix
        outp = torch.matmul(attention, value)  # Shape: (batch_size, heads, seq_len, head_dim)

        # Transpose and reshape back to original dimensions
        outp = outp.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_size)  # Shape: (batch_size, seq_len, embed_size)

        # Apply the final linear layer
        outp = self.fc_out(outp)  # Shape: (batch_size, seq_len, embed_size)

        return outp


## Add and Normalize

In [332]:
class AddAndNorm(nn.Module):
  def __init__(self, embed_size):
    super(AddAndNorm, self).__init__()
    self.norm = nn.LayerNorm(embed_size)


  def forward(self, x):

    return x + self.norm(x)

##Feed Forward NN

In [333]:
class FeedForwardNN(nn.Module):
  def __init__(self, embed_size, hidden_size):
    super(FeedForwardNN, self).__init__()
    self.fc1 = nn.Linear(embed_size, hidden_size)
    self.reLu = nn.ReLU()
    self.fc2 = nn.Linear(hidden_size, embed_size)

  def forward(self, x):
    x = self.fc1(x)
    x = self.reLu(x)
    x = self.fc2(x)
    return x

#Transformer Decoder

##Masked Multi-head Attention

In [334]:


class MaskedMultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MaskedMultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_size = embed_size // heads

        assert self.head_size * heads == embed_size, 'Embedding size must be divisible by head numbers'

        # Linear layers for queries, keys, and values
        self.queryMatrix = nn.Linear(self.embed_size, self.embed_size)
        self.keyMatrix = nn.Linear(self.embed_size, self.embed_size)
        self.valueMatrix = nn.Linear(self.embed_size, self.embed_size)

        # Output linear layer
        self.fc_out = nn.Linear(self.embed_size, self.embed_size)

    def forward(self, x, mask=None):
        batch_size, seq_len, embed_size = x.size()

        # Linear projections for query, key, value
        query_mat = self.queryMatrix(x)
        key_mat = self.keyMatrix(x)
        value_mat = self.valueMatrix(x)

        # Reshape for multi-head attention
        query_mat = query_mat.view(batch_size, seq_len, self.heads, self.head_size).transpose(1, 2)
        key_mat = key_mat.view(batch_size, seq_len, self.heads, self.head_size).transpose(1, 2)
        value_mat = value_mat.view(batch_size, seq_len, self.heads, self.head_size).transpose(1, 2)

        # Scaled dot-product attention
        attention_score = torch.matmul(query_mat, key_mat.transpose(-1, -2))
        attention_score = attention_score / torch.sqrt(torch.tensor(self.head_size, dtype=torch.float32))

        # Apply the mask (if provided)
        if mask is not None:
            # Mask should be of shape (batch_size, heads, seq_len, seq_len)
            attention_score = attention_score.masked_fill(mask == 0, float('-inf'))

        # Softmax for attention probabilities
        soft_attention_score = torch.softmax(attention_score, dim=-1)

        # Weighted sum of values
        out = torch.matmul(soft_attention_score, value_mat)

        # Reshape back to (batch_size, seq_len, embed_size)
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_size)

        # Final linear layer
        out = self.fc_out(out)

        return out


##Cross Attention

In [335]:


class CrossAttention(nn.Module):
    def __init__(self, embed_size, heads, encoder_key_size, encoder_value_size):
        super(CrossAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_size = embed_size // heads

        assert self.head_size * heads == embed_size, 'embedding size should be divisible by heads'

        self.queryMatrix = nn.Linear(self.embed_size, self.embed_size)
        self.keyMatrix = nn.Linear(encoder_key_size, self.embed_size)
        self.valueMatrix = nn.Linear(encoder_value_size, self.embed_size)

        self.fc_out = nn.Linear(self.embed_size, self.embed_size)

    def forward(self, x, encoder_key, encoder_value):
        batch_size, seq_len, embed_size = x.size()

        # Apply linear layers to compute Q, K, V
        query_mat = self.queryMatrix(x)  # (batch_size, seq_len, embed_size)
        key_mat = self.keyMatrix(encoder_key)  # (batch_size, seq_len, embed_size)
        value_mat = self.valueMatrix(encoder_value)  # (batch_size, seq_len, embed_size)

        # Get the encoder sequence length (assuming it's the same as encoder_key's seq_len)
        encoder_seq_len = encoder_key.size(1)

        # Reshape Q, K, V for multi-head attention
        query_mat = query_mat.view(batch_size, seq_len, self.heads, self.head_size).transpose(1, 2)  # (batch_size, heads, seq_len, head_size)
        key_mat = key_mat.view(batch_size, encoder_seq_len, self.heads, self.head_size).transpose(1, 2)  # (batch_size, heads, seq_len, head_size) #modified to use encoder_seq_len
        value_mat = value_mat.view(batch_size, encoder_seq_len, self.heads, self.head_size).transpose(1, 2)  # (batch_size, heads, seq_len, head_size) #modified to use encoder_seq_len

        # Calculate attention scores
        attention_score = torch.matmul(query_mat, key_mat.transpose(-1, -2))  # (batch_size, heads, seq_len, seq_len)
        attention_score = attention_score / torch.sqrt(torch.tensor(self.head_size, dtype=torch.float32))  # Scaling
        soft_attention_score = torch.softmax(attention_score, dim=-1)  # (batch_size, heads, seq_len, seq_len)

        # Compute the output from attention scores and value
        value_out = torch.matmul(soft_attention_score, value_mat)  # (batch_size, heads, seq_len, head_size)
        value_out = value_out.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_size)  # (batch_size, seq_len, embed_size)

        # Final output projection
        value_out = self.fc_out(value_out)  # (batch_size, seq_len, embed_size)

        return value_out

##Full Transformer Implementation

In [336]:
class Transformer(nn.Module):
    def __init__(self, embed_size, heads, num_encoder_layers, num_decoder_layers, hidden_size, vocab_size, max_len=5000):
        super(Transformer, self).__init__()

        # Positional encoding
        self.positional_encoding = PositionalEncoding(embed_size, max_len)

        # Embedding layers
        self.embedding = nn.Embedding(vocab_size, embed_size)

        # Encoder layers
        self.encoder_layers = nn.ModuleList(
            [MultiHeadAttention(embed_size, heads) for _ in range(num_encoder_layers)]
        )
        self.encoder_ffn = nn.ModuleList(
            [FeedForwardNN(embed_size, hidden_size) for _ in range(num_encoder_layers)]
        )
        self.encoder_norm = nn.ModuleList(
            [AddAndNorm(embed_size) for _ in range(num_encoder_layers)]
        )

        # Decoder layers
        self.decoder_layers = nn.ModuleList(
            [MaskedMultiHeadAttention(embed_size, heads) for _ in range(num_decoder_layers)]
        )
        self.decoder_ffn = nn.ModuleList(
            [FeedForwardNN(embed_size, hidden_size) for _ in range(num_decoder_layers)]
        )
        self.decoder_cross_attention = nn.ModuleList(
            [CrossAttention(embed_size, heads, embed_size, embed_size) for _ in range(num_decoder_layers)]
        )
        self.decoder_norm = nn.ModuleList(
            [AddAndNorm(embed_size) for _ in range(num_decoder_layers)]
        )

        # Final output layer
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, src, tgt):
        # Embedding and positional encoding
        src = self.positional_encoding(self.embedding(src))
        tgt = tgt.unsqueeze(1)
        tgt = self.positional_encoding(self.embedding(tgt))

        # Encoder
        for i in range(len(self.encoder_layers)):
            src = self.encoder_layers[i](src)
            src = self.encoder_ffn[i](src)
            src = self.encoder_norm[i](src)

        # Decoder
        for i in range(len(self.decoder_layers)):
            tgt = self.decoder_layers[i](tgt)
            tgt = self.decoder_cross_attention[i](tgt, src, src)
            tgt = self.decoder_ffn[i](tgt)
            tgt = self.decoder_norm[i](tgt)

        # Output layer
        out = self.fc_out(tgt)

        return out


In [357]:
model = Transformer(
    embed_size=512,
    heads=8,
    num_encoder_layers=6,
    num_decoder_layers=6,
    hidden_size=2048,
    vocab_size=10000
)



In [363]:
# ... (previous code remains the same) ...

for batch_idx, (input_seq, target) in enumerate(dataloader):
    output = model(input_seq, target)  # Pass source and target to the model
    predicted_indices = torch.argmax(output, dim=-1)
    predicted_words = [list(vocab.keys())[idx.item()] if idx.item() < len(vocab) else "<unk>" for idx in predicted_indices.flatten()]

    # --- Added code to map predictions back to sentences ---
    for i, word in enumerate(predicted_words):
        # Get the input sequence for this prediction (remove padding)
        input_sequence_indices = input_seq[i].tolist()
        input_sequence_indices = [index for index in input_sequence_indices if index != PAD_IDX]

        # Convert input sequence indices back to words
        input_sequence_words = [list(vocab.keys())[index] for index in input_sequence_indices]

        # Join the input words to form the input sentence fragment
        input_sentence_fragment = " ".join(input_sequence_words)

        # Print the input sentence fragment and prediction
        print(f"Input: {input_sentence_fragment}, Predicted: {word}")
    break
 # Process only the first batch for demonstration

Input: the river . ” “but, Predicted: jaw
Input: feet down . i clambered, Predicted: jaw
Input: the inspector sat down at, Predicted: jaw
Input: that he was within earshot, Predicted: jaw
Input: , distinctly professional . ”, Predicted: groan
Input: lawn in front of the, Predicted: jaw
Input: petrarch , and not another, Predicted: jaw
Input: it seemed to me to, Predicted: jaw
Input: who could distinguish the two, Predicted: jaw
Input: _ dénouement _ of the, Predicted: groan
Input: laid down his arms my, Predicted: groan
Input: whistle . “by jove ,, Predicted: superb
Input: object . ‘the church of, Predicted: jaw
Input: “on monday . ” “then, Predicted: jaw
Input: walking alone . the game-keeper, Predicted: jaw
Input: station after eleven o’clock ., Predicted: groan
Input: , ” said holmes ., Predicted: groan
Input: began to ask about father, Predicted: jaw
Input: alive or dead , shall, Predicted: jaw
Input: year out , in such, Predicted: groan
Input: could it be , once, Predicted: jaw
In

In [359]:
predicted_indices = torch.argmax(output, dim=-1)
print(predicted_indices)

tensor([[4159],
        [2558],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [2558],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [2558],
        [4159],
        [2558],
        [2558],
        [5471],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159],
        [4934],
        [4159],
        [4159],
        [4159],
        [4159],
        [4159]])


In [362]:
predicted_indices = torch.argmax(output, dim=-1)
predicted_words = [list(vocab.keys())[idx.item()] if idx.item() < len(vocab) else "<unk>" for idx in predicted_indices.flatten()]
print(predicted_words)

['jaw', 'jaw', 'jaw', 'jaw', 'jaw', 'groan', 'jaw', 'jaw', 'jaw', 'superb', 'groan', 'jaw', 'groan', 'jaw', 'jaw', 'jaw', 'jaw', 'groan', 'groan', 'jaw', 'groan', 'superb', 'jaw', 'curling', 'jaw', 'jaw', 'jaw', 'jaw', 'jaw', 'jaw', 'jaw', 'groan']


##Try Prebuild Model to compare the performance