In [11]:
from torch import nn
import torch
from torch.utils.data import DataLoader

In [12]:
class PositionalEncoding(nn.Module): # RoPE
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))

        # Apply sine to even indices in the array
        pe[:, 0::2] = torch.sin(position * div_term)
        # Apply cosine to odd indices in the array
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add batch dimension
        pe = pe.unsqueeze(0)

        # Register pe as a buffer so it is not a model parameter but still part of the model's state
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        return x + self.pe[:, :x.size(1), :]
    


class MLP(nn.Module):
    def __init__(self, d_model, d_ff):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x
    

def create_padding_mask(seq, pad_idx=0):
    # Create a mask for padding tokens: 1 for non-pad, 0 for pad
    return (seq != pad_idx).unsqueeze(1).unsqueeze(2)

def create_look_ahead_mask(size):
    # Create a triangular mask to hide future tokens
    mask = torch.triu(torch.ones(size, size), diagonal=1).bool()
    return ~mask

In [13]:
def translate(model, sentence, src_vocab, tgt_idx2word, device, max_len=100):
    model.eval()
    
    # Tokenize and convert to indices
    tokens = sentence.split()
    src_indices = [src_vocab.get(word, src_vocab['']) for word in tokens]
    src_indices = [src_vocab['']] + src_indices + [src_vocab['']]
    
    # Pad source sequence
    src_indices = src_indices + [src_vocab['']] * (max_len - len(src_indices))
    src_indices = src_indices[:max_len]
    
    # Convert to tensor
    src_tensor = torch.tensor([src_indices], dtype=torch.long).to(device)
    
    # Create mask
    src_mask = create_padding_mask(src_tensor)
    
    # Get encoder output
    enc_output = model.encode(src_tensor, src_mask)
    
    # Initialize decoder input with  token
    dec_input = torch.tensor([[src_vocab['']]], dtype=torch.long).to(device)
    
    # Generate translation
    output_indices = []
    
    for _ in range(max_len):
        # Create mask for decoder input
        tgt_mask = create_look_ahead_mask(dec_input.size(1)).to(device)
        
        # Get decoder output
        dec_output = model.decode(dec_input, enc_output, src_mask, tgt_mask)
        
        # Get predicted token
        pred = model.linear(dec_output[:, -1])
        pred_idx = pred.argmax(dim=-1).item()
        
        # Add predicted token to output
        output_indices.append(pred_idx)
        
        # Check if end of sequence
        if pred_idx == src_vocab['']:
            break
        
        # Update decoder input
        dec_input = torch.cat([dec_input, torch.tensor([[pred_idx]], dtype=torch.long).to(device)], dim=1)
    
    # Convert indices to words
    output_words = [tgt_idx2word.get(idx, '') for idx in output_indices]
    
    # Remove special tokens
    output_words = [word for word in output_words if word not in ['', '', '']]
    
    return ' '.join(output_words)

In [14]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Linear Projections for Q, K, V
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        # Q K V Shape: (batch_size, num_heads, seq_len, d_k)
        scores = Q @ K.transpose(-2, -1) / (self.d_k ** 0.5)

        # Apply mask if provided
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # Softmax to get attention weights
        attn_weights = scores.softmax(dim=-1)

        # Apply attention weights to V
        output = attn_weights @ V
        
        return output 

    def forward(self, Q, K, V, attn_mask=None):
        batch_size = Q.size(0)

        # Linear projections
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask=attn_mask)

        # Concatenate heads and apply final linear projection
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        output = self.W_o(attn_output)
        
        return output       

In [15]:
from torch.utils.data import Dataset

class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_len=100):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len
        
    def __len__(self):
        return len(self.src_sentences)
    
    def __getitem__(self, idx):
        # Convert source sentence to indices
        src_indices = [self.src_vocab.get(word, self.src_vocab['']) for word in self.src_sentences[idx].split()]
        src_indices = [self.src_vocab['']] + src_indices + [self.src_vocab['']]
        
        # Convert target sentence to indices
        tgt_indices = [self.tgt_vocab.get(word, self.tgt_vocab['']) for word in self.tgt_sentences[idx].split()]
        tgt_indices = [self.tgt_vocab['']] + tgt_indices + [self.tgt_vocab['']]
        
        # Pad sequences
        src_indices = src_indices[:self.max_len]
        tgt_indices = tgt_indices[:self.max_len]
        
        src_indices = src_indices + [self.src_vocab['']] * (self.max_len - len(src_indices))
        tgt_indices = tgt_indices + [self.tgt_vocab['']] * (self.max_len - len(tgt_indices))
        
        return {
            'src': torch.tensor(src_indices, dtype=torch.long),
            'tgt': torch.tensor(tgt_indices[:-1], dtype=torch.long), # Input to decoder
            'tgt_y': torch.tensor(tgt_indices[1:], dtype=torch.long) # Expected output
        }
    
def create_toy_dataset():
    # Simple English to French translation pairs
    eng_sentences = [
        'hello how are you',
        'i am fine thank you',
        'what is your name',
        'my name is john',
        'where do you live',
        'i live in new york',
        'i love programming',
        'this is a test',
        'please translate this',
        'thank you very much'
    ]
    
    fr_sentences = [
        'bonjour comment vas tu',
        'je vais bien merci',
        'quel est ton nom',
        'je m appelle john',
        'où habites tu',
        'j habite à new york',
        'j aime programmer',
        'c est un test',
        's il te plaît traduis cela',
        'merci beaucoup'
    ]
    
    # Create vocabularies
    src_vocab = {'': 0, '': 1, '': 2, '': 3}
    tgt_vocab = {'': 0, '': 1, '': 2, '': 3}
    
    # Add words to vocabularies
    i = 4
    for sent in eng_sentences:
        for word in sent.split():
            if word not in src_vocab:
                src_vocab[word] = i
                i += 1
    
    i = 4
    for sent in fr_sentences:
        for word in sent.split():
            if word not in tgt_vocab:
                tgt_vocab[word] = i
                i += 1
    
    # Create reverse vocabularies for decoding
    src_idx2word = {idx: word for word, idx in src_vocab.items()}
    tgt_idx2word = {idx: word for word, idx in tgt_vocab.items()}
    
    return eng_sentences, fr_sentences, src_vocab, tgt_vocab, src_idx2word, tgt_idx2word

In [16]:
from torch import nn

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):  
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.mlp = MLP(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        # Self-attention with mask + residual connection + layer normalization
        self_attn_output = self.self_attn(x, x, x, attn_mask=tgt_mask)
        x = self.norm1(x + self.dropout(self_attn_output))

        # Cross-attention with mask + residual connection + layer normalization
        cross_attn_output = self.cross_attn(x, enc_output, enc_output, attn_mask=src_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))

        # Feed-forward network with residual connection and layer normalization
        ff_output = self.mlp(x)
        x = self.norm3(x + self.dropout(ff_output))

        return x
    

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        # apply embedding and positional encoding
        x = self.embedding(x) * (self.embedding.embedding_dim ** 0.5)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        # Pass through decoder layers
        for layer in self.layers:
            x = layer(x, enc_output, src_mask=src_mask, tgt_mask=tgt_mask)

        return x

In [17]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.mlp = MLP(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_output = self.self_attn(x, x, x, attn_mask=mask)
        x = self.norm1(x + self.dropout(attn_output)) # Add residual connection

        # Feed-forward network with residual connection and layer normalization
        ff_output = self.mlp(x)
        x = self.norm2(x + self.dropout(ff_output))

        return x


class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Apply embedding and positional encoding
        x = self.embedding(x) * (self.embedding.embedding_dim ** 0.5)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        # Pass through encoder layers
        for layer in self.layers:
            x = layer(x, mask=mask)

        return x
        

In [18]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8,
                  d_ff=2048, num_layers=6, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, num_heads, d_ff, num_layers, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, num_heads, d_ff, num_layers, dropout)
        self.linear = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # Encode source sequence
        enc_output = self.encoder(src, mask=src_mask)
        
        # Decode target sequence
        dec_output = self.decoder(tgt, enc_output, src_mask=src_mask, tgt_mask=tgt_mask)

        # Final linear layer to project to target vocabulary size
        output = self.linear(dec_output)
        return output
    
    def encode(self, src, src_mask=None):
        return self.encoder(src, mask=src_mask)
    
    def decode(self, tgt, enc_output, src_mask=None, tgt_mask=None):
        return self.decoder(tgt, enc_output, src_mask=src_mask, tgt_mask=tgt_mask)

In [19]:
def train_transformer(model, train_loader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0
    
    for batch in train_loader:
        src = batch['src'].to(device)
        tgt = batch['tgt'].to(device)
        tgt_y = batch['tgt_y'].to(device)
        
        # Create masks
        src_mask = create_padding_mask(src)
        tgt_mask = create_padding_mask(tgt) & create_look_ahead_mask(tgt.size(1)).to(device)
        
        # Forward pass
        output = model(src, tgt, src_mask, tgt_mask)
        
        # Reshape output and target for loss calculation
        output = output.view(-1, output.size(-1))
        tgt_y = tgt_y.view(-1)
        
        # Calculate loss
        loss = criterion(output, tgt_y)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    
    return epoch_loss / len(train_loader)


def evaluate_transformer(model, val_loader, criterion, device):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for batch in val_loader:
            src = batch['src'].to(device)
            tgt = batch['tgt'].to(device)
            tgt_y = batch['tgt_y'].to(device)
            
            # Create masks
            src_mask = create_padding_mask(src)
            tgt_mask = create_padding_mask(tgt) & create_look_ahead_mask(tgt.size(1)).to(device)
            
            # Forward pass
            output = model(src, tgt, src_mask, tgt_mask)
            
            # Reshape output and target for loss calculation
            output = output.view(-1, output.size(-1))
            tgt_y = tgt_y.view(-1)
            
            # Calculate loss
            loss = criterion(output, tgt_y)
            
            epoch_loss += loss.item()
    
    return epoch_loss / len(val_loader)

In [20]:
import matplotlib.pyplot as plt


def main():
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Create dataset
    eng_sentences, fr_sentences, src_vocab, tgt_vocab, src_idx2word, tgt_idx2word = create_toy_dataset()
    
    # Create train and validation datasets
    train_size = int(0.8 * len(eng_sentences))
    train_dataset = TranslationDataset(
        eng_sentences[:train_size], 
        fr_sentences[:train_size], 
        src_vocab, 
        tgt_vocab
    )
    val_dataset = TranslationDataset(
        eng_sentences[train_size:], 
        fr_sentences[train_size:], 
        src_vocab, 
        tgt_vocab
    )
    
    # Create data loaders
    batch_size = 2
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    
    # Initialize model
    src_vocab_size = len(src_vocab)
    tgt_vocab_size = len(tgt_vocab)
    
    # Use smaller model for toy dataset
    model = Transformer(
        src_vocab_size=src_vocab_size,
        tgt_vocab_size=tgt_vocab_size,
        d_model=64,
        num_heads=2,
        d_ff=128,
        num_layers=2,
        dropout=0.1
    ).to(device)
    
    # Define optimizer and loss function
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    criterion = nn.CrossEntropyLoss(ignore_index=src_vocab[''])
    
    # Training loop
    num_epochs = 100
    best_val_loss = float('inf')
    
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        train_loss = train_transformer(model, train_loader, optimizer, criterion, device)
        val_loss = evaluate_transformer(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_transformer_model.pth')
    
    # Plot training and validation losses
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Losses')
    plt.legend()
    plt.savefig('transformer_loss.png')
    plt.show()
    
    # Test translation
    test_sentences = [
        'hello how are you',
        'i love programming',
        'thank you very much'
    ]
    
    model.load_state_dict(torch.load('best_transformer_model.pth'))
    
    print("\nTest Translations:")
    for sentence in test_sentences:
        translation = translate(model, sentence, src_vocab, tgt_idx2word, device)
        print(f"English: {sentence}")
        print(f"French: {translation}")
        print()

if __name__ == "__main__":
    main()

Using device: cpu


IndexError: index out of range in self