In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy
from collections import defaultdict
import re

In [2]:
# Corpus and vocab
corpus = [
    'The quick brown fox jumps over the lazy dog.',
    'I love machine learning and artificial intelligence.',
    'The dog chased the cat around the yard.',
    'Deep learning is a subset of machine learning.',
    'Artificial intelligence is transforming industries.',
    'Natural language processing is a key area of AI research.',
    'The fox and the dog quickly became friends.',
    'I enjoy reading about the latest AI breakthroughs.'
]


In [3]:

# Function to tokenize the sentences. You can use more advanced tokenizers like HuggingFace's tokenizers for better performance.
def simple_tokenizer(text):
    text = text.lower()  # Convert text to lowercase for uniformity
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove non-alphanumeric characters
    tokens = text.split()  # Split the cleaned text into words (tokens)
    return tokens  # Return the list of tokens

# Function to build a vocabulary from a given corpus of sentences.
def build_vocab(corpus):
    word_to_index = defaultdict(lambda: len(word_to_index))  # Create a default dictionary for words to indices
    word_to_index["<pad>"] = 0  # Add padding token with index 0
    word_to_index["<sos>"] = 1  # Add start of sentence token with index 1
    word_to_index["<eos>"] = 2  # Add end of sentence token with index 2
    word_to_index["<unk>"] = 3  # Add unknown token with index 3

    # Iterate through each sentence in the corpus to populate the vocabulary
    for sentence in corpus:
        tokens = simple_tokenizer(sentence)  # Tokenize the sentence
        for token in tokens:
            _ = word_to_index[token]  # Add the token to the vocabulary if it's not already present

    return dict(word_to_index)  # Return the complete vocabulary as a dictionary

# Function to prepare data by converting text into indexed tokens and applying padding/truncation.
def prepare_data(corpus, word_to_index, max_seq_length):
    data = []  # Initialize a list to hold the processed data
    for sentence in corpus:
        tokens = simple_tokenizer(sentence)  # Tokenize the sentence
        indexed_sentence = [word_to_index.get(token, word_to_index["<unk>"]) for token in tokens]  # Convert tokens to their corresponding indices
        indexed_sentence = [word_to_index["<sos>"]] + indexed_sentence + [word_to_index["<eos>"]]  # Add start and end tokens

        # Padding or truncating to max_seq_length
        if len(indexed_sentence) < max_seq_length:
            # If the indexed sentence is shorter than max_seq_length, pad it
            indexed_sentence += [word_to_index["<pad>"]] * (max_seq_length - len(indexed_sentence))
        else:
            # If the indexed sentence is longer, truncate it to max_seq_length
            indexed_sentence = indexed_sentence[:max_seq_length]

        data.append(indexed_sentence)  # Append the processed sentence to the data list

    return torch.tensor(data)  # Convert the list of indexed sentences to a PyTorch tensor and return


In [4]:
simple_tokenizer('who are you')

['who', 'are', 'you']

In [5]:
[12,44,88]

[12, 44, 88]

In [6]:
# Define the maximum sequence length for padding/truncation of sentences
max_seq_length = 20  

# Build the vocabulary from the provided corpus of sentences
vocab = build_vocab(corpus)  # Call the build_vocab function to generate the vocabulary



In [7]:
vocab

{'<pad>': 0,
 '<sos>': 1,
 '<eos>': 2,
 '<unk>': 3,
 'the': 4,
 'quick': 5,
 'brown': 6,
 'fox': 7,
 'jumps': 8,
 'over': 9,
 'lazy': 10,
 'dog': 11,
 'i': 12,
 'love': 13,
 'machine': 14,
 'learning': 15,
 'and': 16,
 'artificial': 17,
 'intelligence': 18,
 'chased': 19,
 'cat': 20,
 'around': 21,
 'yard': 22,
 'deep': 23,
 'is': 24,
 'a': 25,
 'subset': 26,
 'of': 27,
 'transforming': 28,
 'industries': 29,
 'natural': 30,
 'language': 31,
 'processing': 32,
 'key': 33,
 'area': 34,
 'ai': 35,
 'research': 36,
 'quickly': 37,
 'became': 38,
 'friends': 39,
 'enjoy': 40,
 'reading': 41,
 'about': 42,
 'latest': 43,
 'breakthroughs': 44}

In [8]:

# Prepare source data by converting sentences into indexed tokens with padding/truncation
src_data = prepare_data(corpus, vocab, max_seq_length)  # Process the corpus for source data

# Prepare target data using the same corpus and vocabulary
tgt_data = prepare_data(corpus, vocab, max_seq_length)  # Process the corpus for target data

![Alt Text](embedding_pic.png)

![Alt Text](pos_enc_pic.png)

![Alt Text](mutihead_pic.png)

![Alt Text](cross_attention_pic.jpg)

In [9]:

# Multi-Head Attention class that extends nn.Module
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure the model dimension is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model  # Model dimension
        self.num_heads = num_heads  # Number of attention heads
        self.d_k = d_model // num_heads  # Dimension of each attention head
        
        # Linear transformations for query, key, value, and output
        self.W_q = nn.Linear(d_model, d_model)  # Linear layer for queries
        self.W_k = nn.Linear(d_model, d_model)  # Linear layer for keys
        self.W_v = nn.Linear(d_model, d_model)  # Linear layer for values
        self.W_o = nn.Linear(d_model, d_model)  # Linear layer for output

    # Scaled dot-product attention mechanism
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Compute attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            # Apply mask to attention scores to prevent attending to certain positions
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)  # Convert scores to probabilities
        output = torch.matmul(attn_probs, V)  # Compute weighted sum of values
        return output  # Return the attention output

    # Function to split the input into multiple heads
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()  # Get input dimensions
        # Reshape and transpose to get heads in the correct format
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
    
    # Function to combine the heads back into a single output
    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()  # Get input dimensions
        # Transpose and reshape to combine heads
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    # Forward pass through the Multi-Head Attention module
    def forward(self, Q, K, V, mask=None):
        # Split inputs into heads
        Q = self.split_heads(self.W_q(Q))  # Transform and split queries
        K = self.split_heads(self.W_k(K))  # Transform and split keys
        V = self.split_heads(self.W_v(V))  # Transform and split values
        
        # Compute attention output
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        # Combine heads and transform to output dimension
        output = self.W_o(self.combine_heads(attn_output))
        return output  # Return the final output


# Position-wise Feed Forward Network class that extends nn.Module
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        # First linear transformation from model dimension to feed-forward dimension
        self.fc1 = nn.Linear(d_model, d_ff)
        # Second linear transformation back to model dimension
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()  # ReLU activation function

    # Forward pass through the feed-forward network
    def forward(self, x):
        # Apply first linear layer, then ReLU, then second linear layer
        return self.fc2(self.relu(self.fc1(x)))

# Positional Encoding class that extends nn.Module
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        # Initialize positional encoding tensor
        pe = torch.zeros(max_seq_length, d_model)  # Create tensor for positional encodings
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)  # Position indices
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))  # Compute the division term

        # Apply sine to even indices and cosine to odd indices for positional encoding
        pe[:, 0::2] = torch.sin(position * div_term)  # Apply sine for even dimensions
        pe[:, 1::2] = torch.cos(position * div_term)  # Apply cosine for odd dimensions

        self.register_buffer('pe', pe.unsqueeze(0))  # Register pe as a buffer for persistent storage

    # Forward pass to add positional encodings to input embeddings
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]  # Add positional encodings to the input

# Encoder layer class that extends nn.Module
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        # Multi-Head Attention for self-attention mechanism
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        # Position-wise Feed Forward network
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        # Layer normalization for the first and second sub-layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout)

    # Forward pass through the Encoder layer
    def forward(self, x, mask):
        # Compute self-attention output
        attn_output = self.self_attn(x, x, x, mask)
        # Apply residual connection, dropout, and normalization
        x = self.norm1(x + self.dropout(attn_output))
        # Pass through feed-forward network
        ff_output = self.feed_forward(x)
        # Apply another residual connection, dropout, and normalization
        x = self.norm2(x + self.dropout(ff_output))
        return x  # Return the output of the Encoder layer


# Decoder layer class that extends nn.Module
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        # Multi-Head Attention for self-attention mechanism in the decoder
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        # Multi-Head Attention for cross-attention between decoder and encoder outputs
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        # Position-wise Feed Forward network
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        # Layer normalization for each of the three sub-layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout)

    # Forward pass through the Decoder layer
    def forward(self, x, enc_output, src_mask, tgt_mask):
        # Compute self-attention output for the decoder
        attn_output = self.self_attn(x, x, x, tgt_mask)
        # Apply residual connection, dropout, and normalization
        x = self.norm1(x + self.dropout(attn_output))
        # Compute cross-attention output using encoder output
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        # Apply another residual connection, dropout, and normalization
        x = self.norm2(x + self.dropout(attn_output))
        # Pass through feed-forward network
        ff_output = self.feed_forward(x)
        # Apply a final residual connection, dropout, and normalization
        x = self.norm3(x + self.dropout(ff_output))
        return x  # Return the output of the Decoder layer


# Transformer class that extends nn.Module
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout, vocab=None):
        super(Transformer, self).__init__()

        # Embedding layers for source and target vocabularies
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)  # Source language embeddings
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)  # Target language embeddings
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)  # Positional encoding layer

        # Stack of Encoder layers
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        # Stack of Decoder layers
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        # Final linear layer to project output to target vocabulary size
        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)  # Dropout layer for regularization

        # Store vocabulary if provided
        self.vocab = vocab

    # Generate source and target masks to avoid using padding tokens and future tokens
    def generate_mask(self, src, tgt):
        # Source mask: 1 for valid tokens (not padding), 0 for padding
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        # Target mask: 1 for valid tokens (not padding), 0 for padding
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        # Create a no-peak mask to prevent attending to future tokens in the target sequence
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask  # Combine with target mask
        return src_mask, tgt_mask  # Return both masks

    # Forward pass through the Transformer
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)  # Generate masks for source and target
        # Embed source and target sequences, then apply positional encoding and dropout
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        # Pass through Encoder layers
        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        # Pass through Decoder layers
        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        # Final output projection to target vocabulary size
        output = self.fc(dec_output)
        return output  # Return the output logits for each token in the target sequence

    # Method to retrieve word vectors for the vocabulary
    def get_word_vectors(self):
        if self.vocab:
            # Create a dictionary of word vectors using the encoder embeddings
            word_vectors = {word: self.encoder_embedding(torch.tensor([idx])).detach().numpy()
                            for word, idx in self.vocab.items()}
            return word_vectors  # Return the dictionary of word vectors
        else:
            raise ValueError("Vocabulary not provided!")  # Raise an error if no vocabulary is given



In [10]:
# Define hyperparameters for the Transformer model
src_vocab_size = len(vocab)  # Source vocabulary size derived from the vocab dictionary
tgt_vocab_size = len(vocab)  # Target vocabulary size; using the same vocab for both source and target
d_model = 512  # Dimension of the model (embedding size)
num_heads = 8  # Number of attention heads in multi-head attention
num_layers = 6  # Number of encoder and decoder layers in the Transformer
d_ff = 2048  # Dimension of the feedforward network (hidden layer size)
dropout = 0.1  # Dropout rate for regularization

# Initialize Transformer model with vocab and parameters
transformer = Transformer(
    src_vocab_size,  # Source vocabulary size
    tgt_vocab_size,  # Target vocabulary size
    d_model,         # Model dimension
    num_heads,       # Number of attention heads
    num_layers,      # Number of encoder/decoder layers
    d_ff,            # Feedforward dimension
    max_seq_length,  # Maximum sequence length for padding
    dropout,         # Dropout rate
    vocab=vocab      # Vocabulary to be used for embedding
)


In [12]:
# Define the loss function for training
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])  # Use CrossEntropyLoss and ignore the padding index for loss computation

# Initialize the optimizer for the Transformer model
optimizer = optim.Adam(transformer.parameters(), lr=0.0001)  # Use Adam optimizer with a learning rate of 0.0001

In [13]:
# Define the number of training epochs
num_epochs = 10

# Start the training loop
for epoch in range(num_epochs):
    transformer.train()  # Set the model to training mode
    optimizer.zero_grad()  # Zero the gradients from the previous step
    
    # Forward pass: feed input and target data (excluding the last token from target data)
    output = transformer(src_data, tgt_data[:, :-1])
    
    # Calculate loss: compare model output with the shifted target (excluding the first token)
    loss = criterion(output.view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    
    # Backward pass: compute gradients
    loss.backward()
    
    # Update the model parameters
    optimizer.step()
    
    # Print the epoch number and the current loss
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")


Epoch [1/10], Loss: 3.9269
Epoch [2/10], Loss: 3.5114
Epoch [3/10], Loss: 3.0512
Epoch [4/10], Loss: 2.7600
Epoch [5/10], Loss: 2.2986
Epoch [6/10], Loss: 1.8157
Epoch [7/10], Loss: 1.4394
Epoch [8/10], Loss: 1.1323
Epoch [9/10], Loss: 0.9541
Epoch [10/10], Loss: 0.7421


In [14]:
# Extract word embeddings from the transformer's embedding layer
word_embeddings = transformer.encoder_embedding.weight.detach().cpu().numpy()

# Map each word in the vocabulary to its corresponding embedding vector
word_vectors = {word: word_embeddings[idx] for word, idx in vocab.items()}

# Print each word along with its embedding vector
for word, vector in word_vectors.items():
    print(f"Word: {word}, Vector: {vector}")


Word: <pad>, Vector: [-6.20904677e-02 -1.70037544e+00 -1.08908936e-01  3.23058590e-02
  1.02819920e+00  1.43218660e+00 -8.22772861e-01  9.40254152e-01
  2.68493271e+00 -8.04498374e-01 -9.51462686e-02 -1.26836252e+00
 -5.68448067e-01 -1.41737759e+00  3.42007726e-01  1.38256013e+00
 -8.90113056e-01  2.77356476e-01  3.63071471e-01  6.91394150e-01
  4.89379227e-01  3.06690782e-01 -4.84400332e-01 -1.47234118e+00
  9.36924368e-02  6.76452219e-01  1.12127423e+00 -8.11688960e-01
  5.29021978e-01  4.63937461e-01  7.42318809e-01 -2.69797206e-01
 -9.92968142e-01 -8.45172465e-01 -1.71575356e+00  8.56682777e-01
 -3.47541511e-01  2.20942870e-01  1.32173514e+00 -1.19997871e+00
  9.90974426e-01  7.68319547e-01 -7.01737642e-01  6.93175375e-01
  1.25227344e+00  2.49091670e-01 -4.45819765e-01  1.64440739e+00
  1.80927098e+00  2.20219985e-01  1.61950082e-01  4.41609383e-01
  9.72308517e-01 -6.09114803e-02 -8.14907789e-01 -7.63841927e-01
  2.54727662e-01  1.52574670e+00 -2.83956289e-01  3.25065345e-01
  2.

In [20]:
def inference(word):

    # Inference for the word "intelligence" with multiple predictions
    num_predictions = 5  # Number of words to predict

    # Convert the word to its corresponding token index
    word_index = vocab.get(word, vocab["<unk>"])  # Use <unk> token if word not found in vocab

    # Prepare the input tensor (add batch and sequence dimensions)
    input_tensor = torch.tensor([[vocab["<sos>"], word_index]])  # Start with <sos> and the input word
    input_tensor = input_tensor.to(next(transformer.parameters()).device)  # Send input to the same device as the model

    predicted_words = []  # List to store predicted words
    # Convert the predicted index back to a word using reverse vocabulary
    reverse_vocab = {index: word for word, index in vocab.items()}
    # Perform inference using the Transformer model
    transformer.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient calculation for inference
        for _ in range(num_predictions):
            # Perform inference for the current input
            output = transformer(input_tensor, input_tensor[:, :-1])  # Same input for src and tgt during inference

            # Get the prediction for the last token in the output
            predicted_logits = output[:, -1, :]  # Shape: (batch_size, tgt_vocab_size)

            # Get the predicted token index with the highest probability
            predicted_index = predicted_logits.argmax(dim=-1).item()  # Get the index of the highest probability token

            # Convert the predicted index back to a word using reverse vocabulary
            predicted_word = reverse_vocab.get(predicted_index, "<unk>")  # Use <unk> if index is not found
            predicted_words.append(predicted_word)  # Append the predicted word to the list

            # Update the input tensor with the newly predicted word
            input_tensor = torch.cat((input_tensor, torch.tensor([[predicted_index]]).to(input_tensor.device)), dim=1)  # Add the predicted word to the input tensor
            
    return predicted_words
# Print the input word and predicted words
word='language'
print(f"Input word: {word}")
print(f"Predicted words: {inference(word)}")


Input word: language
Predicted words: ['natural', 'processing', 'language', 'is', 'processing']
