In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

In [2]:
# Hyperparameters
d_model = 512  # Embedding size
nhead = 8      # Number of heads in multi-head attention
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = 2048
dropout = 0.1
max_seq_length = 100  # Max length of input sequences

In [3]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=max_seq_length):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        self.encoding.requires_grad = False  # No gradient

        pos = torch.arange(0, max_len).unsqueeze(1)
        _2i = torch.arange(0, d_model, 2)

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.encoding[:seq_len, :].to(x.device)

In [4]:
# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, nhead):
        super(MultiHeadAttention, self).__init__()
        self.nhead = nhead
        self.d_model = d_model

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        def transform(x):
            x = x.view(batch_size, -1, self.nhead, self.d_model // self.nhead)
            return x.transpose(1, 2)

        query = transform(self.query(query))
        key = transform(self.key(key))
        value = transform(self.value(value))

        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_model)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = torch.nn.functional.softmax(scores, dim=-1)

        x = torch.matmul(attention, value)
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.out(x)

In [5]:
# Feedforward Network
class FeedForward(nn.Module):
    def __init__(self, d_model, dim_feedforward, dropout):
        super(FeedForward, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward, d_model)
        )

    def forward(self, x):
        return self.net(x)

In [6]:
# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead)
        self.ff = FeedForward(d_model, dim_feedforward, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None):
        src2 = self.self_attn(src, src, src, src_mask)
        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.ff(src)
        src = src + self.dropout2(src2)
        return self.norm2(src)

In [7]:
# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead)
        self.multihead_attn = MultiHeadAttention(d_model, nhead)
        self.ff = FeedForward(d_model, dim_feedforward, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)
        tgt2 = self.multihead_attn(tgt, memory, memory, memory_mask)
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        tgt2 = self.ff(tgt)
        tgt = tgt + self.dropout3(tgt2)
        return self.norm3(tgt)

In [9]:
# Encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers, dim_feedforward, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(d_model)
        src = self.pos_encoding(src)
        for layer in self.layers:
            src = layer(src, src_mask)
        return self.norm(src)


In [10]:

# Decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, d_model, nhead, num_layers, dim_feedforward, dropout):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.fc_out = nn.Linear(d_model, output_dim)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        tgt = self.embedding(tgt) * math.sqrt(d_model)
        tgt = self.pos_encoding(tgt)
        for layer in self.layers:
            tgt = layer(tgt, memory, tgt_mask, memory_mask)
        return self.fc_out(self.norm(tgt))

In [11]:
# Full Transformer Model
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(Transformer, self).__init__()
        self.encoder = Encoder(input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, dropout)
        self.decoder = Decoder(output_dim, d_model, nhead, num_decoder_layers, dim_feedforward, dropout)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        memory = self.encoder(src, src_mask)
        output = self.decoder(tgt, memory, tgt_mask, memory_mask)
        return output

In [12]:
# Example usage
if __name__ == "__main__":
    # Example parameters (can be adjusted)
    input_dim = 10000  # Vocabulary size of the source language
    output_dim = 10000  # Vocabulary size of the target language

    model = Transformer(input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)

    # Dummy data (batch_size=2, sequence_length=10)
    src = torch.randint(0, input_dim, (2, 10))  # Source sentence
    tgt = torch.randint(0, output_dim, (2, 10))  # Target sentence

    # Forward pass
    output = model(src, tgt)

    # Output shape: (batch_size, tgt_sequence_length, output_dim)
    print(output.shape)

    # Defining the optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    # Dummy training loop
    for epoch in range(10):
        model.train()
        optimizer.zero_grad()

        output = model(src, tgt[:, :-1])  # Predict the next token
        loss = criterion(output.view(-1, output_dim), tgt[:, 1:].reshape(-1))  # Compare with the actual token

        loss.backward()
        optimizer.step()

        print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


torch.Size([2, 10, 10000])
Epoch 1, Loss: 9.400156021118164
Epoch 2, Loss: 5.683928489685059
Epoch 3, Loss: 5.805520057678223
Epoch 4, Loss: 4.35227108001709
Epoch 5, Loss: 4.571855545043945
Epoch 6, Loss: 2.846435070037842
Epoch 7, Loss: 5.040071487426758
Epoch 8, Loss: 3.4016904830932617
Epoch 9, Loss: 2.0712385177612305
Epoch 10, Loss: 0.8312703967094421


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

# Assuming the Transformer class and other related classes are already defined above.

# A helper function to create the target mask
def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz)) == 1
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Function to perform inference and predict the next word
def predict_next_word(model, src_sentence, max_length=20):
    model.eval()

    src = torch.tensor(src_sentence).unsqueeze(0)  # Add batch dimension
    tgt = torch.zeros(1, 1).long()  # Start token for target sentence

    for _ in range(max_length):
        tgt_mask = generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)

        output = model(src, tgt, tgt_mask=tgt_mask)
        next_token = output.argmax(dim=-1)[:, -1].unsqueeze(1)  # Get the highest probability word

        tgt = torch.cat((tgt, next_token), dim=1)  # Append the predicted word to the target sequence

        if next_token.item() == 3:  # Assuming 3 is the <eos> token
            break

    return tgt.squeeze().tolist()

# Example usage
if __name__ == "__main__":
    # Example parameters (can be adjusted)
    input_dim = 10000  # Vocabulary size of the source language
    output_dim = 10000  # Vocabulary size of the target language

    # Instantiate the model
    model = Transformer(input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)

    # Dummy data (for example purposes, usually it would be actual sentences)
    src_sentence = [1, 5, 6, 7, 2]  # Example tokenized source sentence (1=start, 2=end tokens)

    # Assume the model is already trained, or you can load pre-trained weights here.

    # Perform inference
    predicted_sentence = predict_next_word(model, src_sentence)

    print(f"Predicted tokens: {predicted_sentence}")


Predicted tokens: [0, 7708, 3685, 9907, 2743, 6693, 3357, 9907, 2743, 6693, 3357, 9907, 416, 1607, 1172, 937, 2241, 1939, 3466, 1155, 219]


In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

# Assuming the Transformer class and other related classes are already defined above.

# A helper function to create the target mask
def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz)) == 1
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Function to perform inference and predict the next word
def predict_next_word(model, src_sentence, vocab, max_length=20):
    model.eval()

    src = torch.tensor(src_sentence).unsqueeze(0)  # Add batch dimension
    tgt = torch.zeros(1, 1).long()  # Start token for target sentence

    for _ in range(max_length):
        tgt_mask = generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)

        output = model(src, tgt, tgt_mask=tgt_mask)
        next_token = output.argmax(dim=-1)[:, -1].unsqueeze(1)  # Get the highest probability word

        tgt = torch.cat((tgt, next_token), dim=1)  # Append the predicted word to the target sequence

        if next_token.item() == vocab['<eos>']:  # Assuming <eos> is the end token
            break

    # Convert tokens to words
    predicted_tokens = tgt.squeeze().tolist()
    predicted_words = [vocab.get(token, "<unk>") for token in predicted_tokens]

    return " ".join(predicted_words)

# Example usage
if __name__ == "__main__":
    # Example parameters (can be adjusted)
    input_dim = 10000  # Vocabulary size of the source language
    output_dim = 10000  # Vocabulary size of the target language

    # Example vocabulary (you should replace this with your actual vocabulary)
    vocab = {
        1: "<start>",
        2: "<eos>",
        3: "The",
        4: "dog",
        5: "barked",
        6: "at",
        7: "the",
        8: "cat",
        # ... (other tokens in your vocabulary)
    }
    # Reverse the vocabulary (token to word mapping)
    reverse_vocab = {v: k for k, v in vocab.items()}

    # Instantiate the model
    model = Transformer(input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)

    # Dummy data (for example purposes, usually it would be actual sentences)
    src_sentence = [reverse_vocab["<start>"], reverse_vocab["The"], reverse_vocab["dog"], reverse_vocab["barked"], reverse_vocab["<eos>"]]  # Example tokenized source sentence

    # Assume the model is already trained, or you can load pre-trained weights here.

    # Perform inference
    predicted_sentence = predict_next_word(model, src_sentence, reverse_vocab)

    print(f"Predicted sentence: {predicted_sentence}")


Predicted sentence: <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk> <unk>


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math

# Assume the Transformer class, PositionalEncoding, MultiHeadAttention, etc., are already defined as per the previous code.

# Define a simple dataset for sequence-to-sequence tasks
class Seq2SeqDataset(Dataset):
    def __init__(self, source_sentences, target_sentences, src_vocab, tgt_vocab):
        self.source_sentences = source_sentences
        self.target_sentences = target_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab

    def __len__(self):
        return len(self.source_sentences)

    def __getitem__(self, idx):
        # Ensure tokens are within the valid range
        src = [self.src_vocab.get(token, self.src_vocab["<unk>"]) for token in self.source_sentences[idx]]
        tgt = [self.tgt_vocab.get(token, self.tgt_vocab["<unk>"]) for token in self.target_sentences[idx]]
        return torch.tensor(src), torch.tensor(tgt)

# Custom collate function to handle variable length sequences
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_item, tgt_item in batch:
        src_batch.append(src_item)
        tgt_batch.append(tgt_item)
    # Pad sequences to the same length
    src_batch = nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=0) # Assuming 0 is the padding token
    tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=0)
    return src_batch, tgt_batch

# A helper function to create the target mask
def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz)) == 1
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Training loop
def train_model(model, data_loader, optimizer, criterion, num_epochs=10):
    model.train()

    for epoch in range(num_epochs):
        total_loss = 0
        for src, tgt in data_loader:
            src = src.to(device)
            tgt_input = tgt[:, :-1].to(device)  # Input for the model
            tgt_output = tgt[:, 1:].to(device)  # Expected output

            tgt_mask = generate_square_subsequent_mask(tgt_input.size(1)).to(device)

            optimizer.zero_grad()
            output = model(src, tgt_input, tgt_mask=tgt_mask)

            loss = criterion(output.view(-1, output.shape[-1]), tgt_output.contiguous().view(-1))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(data_loader)
        print(f"Epoch {epoch + 1}, Loss: {avg_loss:.4f}")

# Example usage
if __name__ == "__main__":
    # Example vocabularies (as provided above)
    src_vocab = {
        "<start>": 1, "<eos>": 2, "<unk>": 3,
        "The": 4, "dog": 5, "barked": 6, "at": 7, "the": 8, "cat": 9,
        "A": 10, "man": 11, "is": 12, "walking": 13, "in": 14, "park": 15,
        "Hello": 16, "world": 17
    }
    tgt_vocab = {
        "<start>": 1, "<eos>": 2, "<unk>": 3,
        "Le": 4, "chien": 5, "aboie": 6, "à": 7, "le": 8, "chat": 9,
        "Un": 10, "homme": 11, "marche": 12, "dans": 13, "le": 14, "parc": 15,
        "Bonjour": 16, "le": 17, "monde": 18
    }

    # Example sentences (as provided above)
    source_sentences = [
        ["<start>", "The", "dog", "barked", "at", "the", "cat", "<eos>"],
        ["<start>", "A", "man", "is", "walking", "in", "the", "park", "<eos>"],
        ["<start>", "Hello", "world", "<eos>"],
    ]
    target_sentences = [
        ["<start>", "Le", "chien", "aboie", "à", "le", "chat", "<eos>"],
        ["<start>", "Un", "homme", "marche", "dans", "le", "parc", "<eos>"],
        ["<start>", "Bonjour", "le", "monde", "<eos>"],
    ]

    # Prepare the dataset and data loader
    dataset = Seq2SeqDataset(source_sentences, target_sentences, src_vocab, tgt_vocab)
    # Use the custom collate function
    data_loader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    # Prepare the dataset and data loader
    #dataset = Seq2SeqDataset(source_sentences, target_sentences, src_vocab, tgt_vocab)
    #data_loader = DataLoader(dataset, batch_size=2, shuffle=True)


    # Define model parameters
    input_dim = len(src_vocab)  # Vocabulary size of the source language
    output_dim = len(tgt_vocab)  # Vocabulary size of the target language
    # Define model parameters
    #input_dim = len(src_vocab) + 1  # Vocabulary size of the source language
    #output_dim = len(tgt_vocab) + 1  # Vocabulary size of the target language
    #device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    #model = Transformer(input_dim, output_dim, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout).to(device)

    # Define optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=src_vocab["<unk>"])  # Ignore the <unk> token during loss calculation

    # Train the model
    train_model(model, data_loader, optimizer, criterion, num_epochs=10)

    # Save the model
    torch.save(model.state_dict(), "transformer_model.pth")


NameError: name 'device' is not defined

In [None]:
# Inference example
reverse_vocab = {v: k for k, v in tgt_vocab.items()}
src_sentence = [src_vocab["<start>"], src_vocab["The"], src_vocab["dog"], src_vocab["barked"], src_vocab["<eos>"]]  # Example source sentence
# Pass an integer for max_length (e.g., 20)
predicted_sentence = predict_next_word(model, src_sentence, tgt_vocab, max_length=20) # Pass tgt_vocab instead of reverse_vocab

print(f"Predicted sentence: {predicted_sentence}")

Predicted sentence: <unk> <unk>
