In [14]:
import torch
import torch.optim as optim
import torch.nn as nn
import math
import torch.nn.functional as F

In [15]:
class SelfAttention(nn.Module):
    def __init__(self, embedding_dim):
        super(SelfAttention, self).__init__()
        self.embedding_dim = embedding_dim
        self.W_query = torch.randn(embedding_dim, embedding_dim)
        self.W_key = torch.randn(embedding_dim, embedding_dim)
        self.W_value = torch.randn(embedding_dim, embedding_dim)

    def forward(self, X):
        Q = torch.matmul(X, self.W_query)
        K = torch.matmul(X, self.W_key)
        V = torch.matmul(X, self.W_value)

        attention_scores = torch.matmul(Q, K.transpose(-2, -1))
        attention_scores = attention_scores / torch.sqrt(torch.tensor(self.embedding_dim))
        attention_weights = F.softmax(attention_scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

# Process sentences
sentence1 = ["I", "like", "Batman"]
sentence2 = ["I", "like", "Dwight", "Schrute", "from", "The", "Office"]

# Combine unique words from both sentences
unique_words = list(set(sentence1 + sentence2))
vocab_size = len(unique_words)
word_to_idx = {word: idx for idx, word in enumerate(unique_words)}

# Create one-hot encodings
def create_one_hot(sentence, word_to_idx, vocab_size):
    encodings = torch.zeros((len(sentence), vocab_size))
    for i, word in enumerate(sentence):
        encodings[i][word_to_idx[word]] = 1
    return encodings

# Process each sentence separately
X1 = create_one_hot(sentence1, word_to_idx, vocab_size)
X2 = create_one_hot(sentence2, word_to_idx, vocab_size)

# Initialize and apply self-attention
attention = SelfAttention(embedding_dim=vocab_size)

# Process sentence 1
output1, attention_weights1 = attention.forward(X1)
print("\nAttention weights for 'I like Batman':")
for i, word1 in enumerate(sentence1):
    for j, word2 in enumerate(sentence1):
        print(f"{word1} -> {word2}: {attention_weights1[i][j]:.3f}")

# Process sentence 2
output2, attention_weights2 = attention.forward(X2)
print("\nAttention weights for 'I like Dwight Schrute from The Office':")
for i, word1 in enumerate(sentence2):
    for j, word2 in enumerate(sentence2):
        print(f"{word1} -> {word2}: {attention_weights2[i][j]:.3f}")


Attention weights for 'I like Batman':
I -> I: 0.766
I -> like: 0.124
I -> Batman: 0.110
like -> I: 0.455
like -> like: 0.160
like -> Batman: 0.385
Batman -> I: 0.334
Batman -> like: 0.452
Batman -> Batman: 0.214

Attention weights for 'I like Dwight Schrute from The Office':
I -> I: 0.357
I -> like: 0.058
I -> Dwight: 0.090
I -> Schrute: 0.270
I -> from: 0.101
I -> The: 0.083
I -> Office: 0.041
like -> I: 0.103
like -> like: 0.036
like -> Dwight: 0.010
like -> Schrute: 0.184
like -> from: 0.430
like -> The: 0.043
like -> Office: 0.194
Dwight -> I: 0.038
Dwight -> like: 0.367
Dwight -> Dwight: 0.075
Dwight -> Schrute: 0.163
Dwight -> from: 0.035
Dwight -> The: 0.241
Dwight -> Office: 0.081
Schrute -> I: 0.575
Schrute -> like: 0.022
Schrute -> Dwight: 0.061
Schrute -> Schrute: 0.065
Schrute -> from: 0.095
Schrute -> The: 0.050
Schrute -> Office: 0.132
from -> I: 0.176
from -> like: 0.174
from -> Dwight: 0.100
from -> Schrute: 0.112
from -> from: 0.061
from -> The: 0.044
from -> Office:

In [16]:
class Dropout:
    def __init__(self, p=0.3):  # Increased dropout for small data
        self.p = p
    
    def forward(self, x, training=True):
        if not training or self.p == 0:
            return x
        mask = torch.bernoulli(torch.full_like(x, 1 - self.p)) / (1 - self.p)
        return x * mask

In [17]:
class MultiHeadAttention:
    def __init__(self, dmodel, num_heads, dropout=0.3, device=None):
        self.dmodel = dmodel
        self.num_heads = num_heads
        self.head_dim = dmodel // num_heads
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Xavier initialization
        self.W_query = [nn.Parameter(torch.empty(dmodel, self.head_dim, device=self.device)) for _ in range(num_heads)]
        self.W_key = [nn.Parameter(torch.empty(dmodel, self.head_dim, device=self.device)) for _ in range(num_heads)]
        self.W_value = [nn.Parameter(torch.empty(dmodel, self.head_dim, device=self.device)) for _ in range(num_heads)]
        self.W_output = nn.Parameter(torch.empty(self.head_dim * num_heads, dmodel, device=self.device))
        
        for w in self.W_query + self.W_key + self.W_value + [self.W_output]:
            nn.init.xavier_uniform_(w, gain=1.0)
        
        self.dropout = Dropout(dropout)
    
    def softmax(self, x):
        exp_x = torch.exp(x - torch.max(x, dim=-1, keepdim=True)[0])
        return exp_x / exp_x.sum(dim=-1, keepdim=True)
    
    def forward(self, X, memory=None, mask=None, training=True):
        head_outputs = []
        all_attention_weights = []
        
        for head in range(self.num_heads):
            Q = torch.matmul(X, self.W_query[head])
            K = torch.matmul(memory if memory is not None else X, self.W_key[head])
            V = torch.matmul(memory if memory is not None else X, self.W_value[head])
            
            scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)
            if mask is not None:
                scores = scores.masked_fill(mask == 0, -1e9)
            weights = self.softmax(scores)
            output = torch.matmul(weights, V)
            head_outputs.append(self.dropout.forward(output, training))
            all_attention_weights.append(weights)
        
        multi_head_output = torch.cat(head_outputs, dim=-1)
        final_output = torch.matmul(multi_head_output, self.W_output)
        final_output = self.dropout.forward(final_output, training)
        
        return final_output, all_attention_weights
    
    def parameters(self):
        params = []
        params.extend(self.W_query)
        params.extend(self.W_key)
        params.extend(self.W_value)
        params.append(self.W_output)
        return params

In [18]:
class FFN:
    def __init__(self, dmodel, d_ff, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.W1 = nn.Parameter(torch.empty(dmodel, d_ff, device=self.device))
        self.W2 = nn.Parameter(torch.empty(d_ff, dmodel, device=self.device))
        self.b1 = nn.Parameter(torch.zeros(d_ff, device=self.device))
        self.b2 = nn.Parameter(torch.zeros(dmodel, device=self.device))
        
        nn.init.xavier_uniform_(self.W1, gain=1.0)
        nn.init.xavier_uniform_(self.W2, gain=1.0)
    
    def forward(self, X):
        Z1 = torch.matmul(X, self.W1) + self.b1
        A1 = torch.maximum(torch.zeros_like(Z1), Z1)  # ReLU
        Z2 = torch.matmul(A1, self.W2) + self.b2
        return Z2
    
    def parameters(self):
        return [self.W1, self.W2, self.b1, self.b2]

In [19]:
class PositionalEncoding:
    def __init__(self, dmodel, max_seq_length=5000, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.encoding = torch.zeros(max_seq_length, dmodel, device=self.device)
        position = torch.arange(max_seq_length, device=self.device).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, dmodel, 2, device=self.device) * (-math.log(10000.0) / dmodel))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term[:dmodel//2])
    
    def forward(self, X):
        seq_length = X.shape[1]
        return X + self.encoding[:seq_length]
    
    def parameters(self):
        return []

In [20]:
class Embedding:
    def __init__(self, vocab_size, dmodel, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.embedding_matrix = nn.Parameter(torch.empty(vocab_size, dmodel, device=self.device))
        nn.init.xavier_uniform_(self.embedding_matrix, gain=1.0)
        self.dmodel = dmodel
    
    def forward(self, x):
        return self.embedding_matrix[x] * (self.dmodel ** 0.5)

    def parameters(self):
        return [self.embedding_matrix]

In [21]:
class LayerNormalization:
    def __init__(self, dmodel, epsilon=1e-6, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gamma = nn.Parameter(torch.ones(dmodel, device=self.device))
        self.beta = nn.Parameter(torch.zeros(dmodel, device=self.device))
        self.epsilon = epsilon
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        normalized = (x - mean) / torch.sqrt(var + self.epsilon)
        return self.gamma * normalized + self.beta
    
    def parameters(self):
        return [self.gamma, self.beta]

In [22]:
class EncoderLayer:
    def __init__(self, dmodel, num_heads, d_ff, dropout=0.3, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.mha = MultiHeadAttention(dmodel, num_heads, dropout, self.device)
        self.ffn = FFN(dmodel, d_ff, self.device)
        self.norm1 = LayerNormalization(dmodel, device=self.device)
        self.norm2 = LayerNormalization(dmodel, device=self.device)
        self.dropout = Dropout(dropout)
    
    def forward(self, X, training=True):
        attn_output, attn_weights = self.mha.forward(X, training=training)
        dropout1 = self.dropout.forward(attn_output, training)
        out1 = self.norm1.forward(X + dropout1)
        
        ffn_output = self.ffn.forward(out1)
        dropout2 = self.dropout.forward(ffn_output, training)
        out2 = self.norm2.forward(out1 + dropout2)
        
        return out2, attn_weights
    
    def parameters(self):
        return self.mha.parameters() + self.ffn.parameters() + self.norm1.parameters() + self.norm2.parameters()

In [23]:
class Encoder:
    def __init__(self, num_layers, dmodel, num_heads, d_ff, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.layers = [EncoderLayer(dmodel, num_heads, d_ff, device=self.device) for _ in range(num_layers)]
    
    def forward(self, X, training=True):
        attention_weights = []
        output = X
        for layer in self.layers:
            output, weights = layer.forward(output, training)
            attention_weights.append(weights)
        return output, attention_weights
    
    def parameters(self):
        params = []
        for layer in self.layers:
            params.extend(layer.parameters())
        return params

In [24]:
class DecoderLayer:
    def __init__(self, dmodel, num_heads, d_ff, dropout=0.3, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.masked_mha = MultiHeadAttention(dmodel, num_heads, dropout, self.device)
        self.mha = MultiHeadAttention(dmodel, num_heads, dropout, self.device)
        self.ffn = FFN(dmodel, d_ff, self.device)
        self.norm1 = LayerNormalization(dmodel, device=self.device)
        self.norm2 = LayerNormalization(dmodel, device=self.device)
        self.norm3 = LayerNormalization(dmodel, device=self.device)
        self.dropout = Dropout(dropout)
    
    def create_mask(self, size):
        return torch.tril(torch.ones(size, size, device=self.device))
    
    def forward(self, X, encoded_output, training=True):
        seq_length = X.shape[1]
        mask = self.create_mask(seq_length)
        
        masked_attn_output, masked_weights = self.masked_mha.forward(X, mask=mask, training=training)
        dropout1 = self.dropout.forward(masked_attn_output, training)
        out1 = self.norm1.forward(X + dropout1)
        
        attn_output, attn_weights = self.mha.forward(out1, memory=encoded_output, training=training)
        dropout2 = self.dropout.forward(attn_output, training)
        out2 = self.norm2.forward(out1 + dropout2)
        
        ffn_output = self.ffn.forward(out2)
        dropout3 = self.dropout.forward(ffn_output, training)
        out3 = self.norm3.forward(out2 + dropout3)
        
        return out3, (masked_weights, attn_weights)
    
    def parameters(self):
        return (self.masked_mha.parameters() + self.mha.parameters() + 
                self.ffn.parameters() + self.norm1.parameters() + 
                self.norm2.parameters() + self.norm3.parameters())

In [25]:
class Decoder:
    def __init__(self, num_layers, dmodel, num_heads, d_ff, device=None):
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.layers = [DecoderLayer(dmodel, num_heads, d_ff, device=self.device) for _ in range(num_layers)]
    
    def forward(self, X, encoded_output, training=True):
        attention_weights = []
        output = X
        for layer in self.layers:
            output, weights = layer.forward(output, encoded_output, training)
            attention_weights.append(weights)
        return output, attention_weights
    
    def parameters(self):
        params = []
        for layer in self.layers:
            params.extend(layer.parameters())
        return params

In [26]:
class Transformer:
    def __init__(self, num_layers, dmodel, num_heads, d_ff, input_vocab_size, target_vocab_size, dropout=0.3, device=None):
        self.dmodel = dmodel
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.input_embedding = Embedding(input_vocab_size, dmodel, self.device)
        self.output_embedding = Embedding(target_vocab_size, dmodel, self.device)
        self.positional_encoding = PositionalEncoding(dmodel, device=self.device)
        self.dropout = Dropout(dropout)
        self.encoder = Encoder(num_layers, dmodel, num_heads, d_ff, self.device)
        self.decoder = Decoder(num_layers, dmodel, num_heads, d_ff, self.device)
        self.final_layer = nn.Parameter(torch.empty(dmodel, target_vocab_size, device=self.device))
        nn.init.xavier_uniform_(self.final_layer, gain=1.0)
    
    def softmax(self, x):
        exp_x = torch.exp(x - torch.max(x, dim=-1, keepdim=True)[0])
        return exp_x / exp_x.sum(dim=-1, keepdim=True)
    
    def forward(self, input_seq, target_seq, training=True):
        input_embedded = self.input_embedding.forward(input_seq)
        input_encoded = self.positional_encoding.forward(input_embedded)
        input_dropped = self.dropout.forward(input_encoded, training)
        
        encoded_output, encoder_attention = self.encoder.forward(input_dropped, training)
        
        target_embedded = self.output_embedding.forward(target_seq)
        target_encoded = self.positional_encoding.forward(target_embedded)
        target_dropped = self.dropout.forward(target_encoded, training)
        
        decoder_output, decoder_attention = self.decoder.forward(target_dropped, encoded_output, training)
        
        logits = torch.matmul(decoder_output, self.final_layer)
        probs = self.softmax(logits)
        
        return probs, (encoder_attention, decoder_attention)
    
    def parameters(self):
        params = []
        params.extend(self.input_embedding.parameters())
        params.extend(self.output_embedding.parameters())
        params.extend(self.positional_encoding.parameters())
        params.extend(self.encoder.parameters())
        params.extend(self.decoder.parameters())
        params.append(self.final_layer)
        return params

    def translate(self, input_seq, max_length=20, start_token=1, end_token=2):
        """Autoregressive inference for translation."""
        with torch.no_grad():
            # Encode input
            input_embedded = self.input_embedding.forward(input_seq)
            input_encoded = self.positional_encoding.forward(input_embedded)
            encoded_output, _ = self.encoder.forward(input_encoded, training=False)
            
            # Initialize target sequence with start token
            target_seq = torch.full((input_seq.shape[0], 1), start_token, dtype=torch.long, device=self.device)
            for _ in range(max_length):
                target_embedded = self.output_embedding.forward(target_seq)
                target_encoded = self.positional_encoding.forward(target_embedded)
                decoder_output, _ = self.decoder.forward(target_encoded, encoded_output, training=False)
                logits = torch.matmul(decoder_output[:, -1:], self.final_layer)
                next_token = torch.argmax(logits, dim=-1)
                target_seq = torch.cat([target_seq, next_token], dim=1)
                if (next_token == end_token).all():
                    break
            return target_seq

def cross_entropy_loss(predictions, targets, smoothing=0.1):
    vocab_size = predictions.shape[-1]
    confidence = 1.0 - smoothing
    smoothing_labels = smoothing / vocab_size
    
    predictions = predictions.view(-1, vocab_size)
    targets = targets.view(-1)
    
    one_hot = torch.zeros_like(predictions)
    one_hot.scatter_(1, targets.unsqueeze(-1), 1)
    smoothed_targets = one_hot * confidence + smoothing_labels
    
    log_probs = -torch.log(predictions + 1e-10)
    loss = (smoothed_targets * log_probs).sum(dim=1).mean()
    return loss

def train(model, input_seq, target_seq, target_labels, epochs=50, warmup_steps=200):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_seq = input_seq.to(device)
    target_seq = target_seq.to(device)
    target_labels = target_labels.to(device)
    
    params = model.parameters()
    optimizer = optim.Adam(params, lr=0.0005, betas=(0.9, 0.98), eps=1e-9)  # Higher base LR
    
    for epoch in range(epochs):
        optimizer.zero_grad()
        output, _ = model.forward(input_seq, target_seq, training=True)
        loss = cross_entropy_loss(output, target_labels)
        loss.backward()
        optimizer.step()
        
        step = epoch + 1
        lr = (model.dmodel ** -0.5) * min(step ** -0.5, step * warmup_steps ** -1.5)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, LR: {lr:.6f}")
    return model

# Small English-to-French translation demo
if __name__ == "__main__":
    # Hyperparameters (smaller for small data)
    num_layers = 2
    dmodel = 128  # Smaller embedding size
    num_heads = 4
    d_ff = 512    # Smaller FFN size
    batch_size = 4
    max_seq_length = 5
    
    # English and French combined for simplicity
    vocab = {"<pad>": 0, "<start>": 1, "<end>": 2, "I": 3, "like": 4, "to": 5, "read": 6, 
             "j": 7, "aime": 8, "lire": 9}  # Small vocab
    vocab_size = len(vocab)
    idx_to_word = {i: w for w, i in vocab.items()}
    
    # Training data: English -> French (small batch)
    # "I like to read" -> "J'aime lire"
    input_seq = torch.tensor([
        [3, 4, 5, 6, 0],  # I like to read <pad>
        [3, 4, 5, 6, 0],
        [3, 4, 5, 6, 0],
        [3, 4, 5, 6, 0]
    ], dtype=torch.long)  # Shape: [batch_size, seq_length]
    
    target_seq = torch.tensor([
        [1, 7, 8, 9, 0],  # <start> j aime lire <pad>
        [1, 7, 8, 9, 0],
        [1, 7, 8, 9, 0],
        [1, 7, 8, 9, 0]
    ], dtype=torch.long)  # Decoder input
    
    target_labels = torch.tensor([
        [7, 8, 9, 2, 0],  # j aime lire <end> <pad>
        [7, 8, 9, 2, 0],
        [7, 8, 9, 2, 0],
        [7, 8, 9, 2, 0]
    ], dtype=torch.long)  # Ground truth
    
    # Initialize and train
    transformer = Transformer(num_layers, dmodel, num_heads, d_ff, vocab_size, vocab_size)
    trained_model = train(transformer, input_seq, target_seq, target_labels, epochs=50)
    
    # Inference
    with torch.no_grad():
        translated = trained_model.translate(input_seq[:1], max_length=5)  # Translate first sentence
        translated_words = [idx_to_word[idx.item()] for idx in translated[0]]
        print("\nInput: I like to read")
        print("Predicted translation:", " ".join(translated_words))

Epoch 1/50, Loss: 2.8731, LR: 0.000031
Epoch 2/50, Loss: 2.3279, LR: 0.000063
Epoch 3/50, Loss: 2.5458, LR: 0.000094
Epoch 4/50, Loss: 2.6139, LR: 0.000125
Epoch 5/50, Loss: 2.4611, LR: 0.000156
Epoch 6/50, Loss: 2.5144, LR: 0.000188
Epoch 7/50, Loss: 2.6135, LR: 0.000219
Epoch 8/50, Loss: 2.4996, LR: 0.000250
Epoch 9/50, Loss: 2.3353, LR: 0.000281
Epoch 10/50, Loss: 1.9717, LR: 0.000313
Epoch 11/50, Loss: 2.2963, LR: 0.000344
Epoch 12/50, Loss: 2.5468, LR: 0.000375
Epoch 13/50, Loss: 2.2445, LR: 0.000406
Epoch 14/50, Loss: 2.0813, LR: 0.000438
Epoch 15/50, Loss: 2.1527, LR: 0.000469
Epoch 16/50, Loss: 2.2111, LR: 0.000500
Epoch 17/50, Loss: 2.1250, LR: 0.000531
Epoch 18/50, Loss: 2.2225, LR: 0.000563
Epoch 19/50, Loss: 1.9191, LR: 0.000594
Epoch 20/50, Loss: 2.4152, LR: 0.000625
Epoch 21/50, Loss: 2.0863, LR: 0.000656
Epoch 22/50, Loss: 2.2209, LR: 0.000688
Epoch 23/50, Loss: 1.8868, LR: 0.000719
Epoch 24/50, Loss: 2.1686, LR: 0.000750
Epoch 25/50, Loss: 1.9884, LR: 0.000781
Epoch 26/