In [2]:
import pandas as pd
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
import string


Cleaning the text

In [3]:
# Read and preprocess dataset
df = pd.read_csv('/kaggle/input/aaaaaa/sentences.csv')
df = df.dropna()
df = df.drop_duplicates()

In [4]:
# Remove punctuation
def remove_punctuation(text):
    return text.translate(str.maketrans('', '', string.punctuation))

df['eng'] = df['eng'].apply(remove_punctuation)
df['darija'] = df['darija'].apply(remove_punctuation)

Train-test split

In [5]:

SPLIT_SIZE = 0.2
train_data, test_data = train_test_split(df, test_size=SPLIT_SIZE, random_state=4)
train_data, val_data = train_test_split(train_data, test_size=SPLIT_SIZE, random_state=4)


Tokenizing sentences.

In [6]:

X = train_data['eng'].str.lower().str.split().tolist()
Y = train_data['darija'].str.split().tolist()

Building vocabularies.

In [7]:

vocab_eng = set(word for sentence in X for word in sentence)
vocab_dari = set(word for sentence in Y for word in sentence)

word_to_ix_eng = {word: i for i, word in enumerate(vocab_eng, start=1)}
word_to_ix_eng['<PAD>'] = 0
word_to_ix_eng['<UNK>'] = len(word_to_ix_eng)

word_to_ix_dari = {word: i for i, word in enumerate(vocab_dari, start=1)}
word_to_ix_dari['<PAD>'] = 0
word_to_ix_dari['<UNK>'] = len(word_to_ix_dari)

Prepare sequences

In [8]:

def prepare_sequence(seq, to_ix):
    return torch.tensor([to_ix.get(word, to_ix['<UNK>']) for word in seq], dtype=torch.long)

X_train_encoded = [prepare_sequence(seq, word_to_ix_eng) for seq in X]
Y_train_encoded = [prepare_sequence(seq, word_to_ix_dari) for seq in Y]

X_train_padded = pad_sequence(X_train_encoded, batch_first=True, padding_value=word_to_ix_eng["<PAD>"])
Y_train_padded = pad_sequence(Y_train_encoded, batch_first=True, padding_value=word_to_ix_dari["<PAD>"])


In [11]:
print(f"Shape of X_train_padded: {X_train_padded.shape}")
print(f"Shape of Y_train_padded: {Y_train_padded.shape}")


Shape of X_train_padded: torch.Size([8108, 38])
Shape of Y_train_padded: torch.Size([8108, 29])


In [12]:
from torch.nn.functional import pad

# Determine the max sequence length
max_seq_length = max(X_train_padded.size(1), Y_train_padded.size(1))

# Pad English sequences to max_seq_length
X_train_padded = pad(X_train_padded, (0, max_seq_length - X_train_padded.size(1)), value=word_to_ix_eng['<PAD>'])

# Pad Darija sequences to max_seq_length
Y_train_padded = pad(Y_train_padded, (0, max_seq_length - Y_train_padded.size(1)), value=word_to_ix_dari['<PAD>'])

# Check new shapes
print(f"New Shape of X_train_padded: {X_train_padded.shape}")
print(f"New Shape of Y_train_padded: {Y_train_padded.shape}")


New Shape of X_train_padded: torch.Size([8108, 38])
New Shape of Y_train_padded: torch.Size([8108, 38])


Hyperparameters

In [15]:
input_dim = len(word_to_ix_eng)
output_dim = len(word_to_ix_dari)
embed_dim = 128  # Embedding dimension
hidden_dim = 256  # Hidden state size of LSTM
n_layers = 2  # Number of LSTM layers
learning_rate = 0.001
num_epochs = 10
batch_size = 64


LSTM Model Implementation

In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define VanillaLSTMCell as per the given equations
class VanillaLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(VanillaLSTMCell, self).__init__()
        self.hidden_size = hidden_size
        
        # Input gate parameters
        self.Wxi = nn.Linear(input_size, hidden_size)
        self.Whi = nn.Linear(hidden_size, hidden_size)
        self.bi = nn.Parameter(torch.zeros(hidden_size))
        
        # Forget gate parameters
        self.Wxf = nn.Linear(input_size, hidden_size)
        self.Whf = nn.Linear(hidden_size, hidden_size)
        self.bf = nn.Parameter(torch.zeros(hidden_size))
        
        # Output gate parameters
        self.Wxo = nn.Linear(input_size, hidden_size)
        self.Who = nn.Linear(hidden_size, hidden_size)
        self.bo = nn.Parameter(torch.zeros(hidden_size))
        
        # Cell gate (g_t) parameters
        self.Wxg = nn.Linear(input_size, hidden_size)
        self.Whg = nn.Linear(hidden_size, hidden_size)
        self.bc = nn.Parameter(torch.zeros(hidden_size))

    def forward(self, x_t, prev_state):
        h_prev, c_prev = prev_state
        
        # Compute gates
        i_t = torch.sigmoid(self.Whi(h_prev) + self.Wxi(x_t) + self.bi)
        f_t = torch.sigmoid(self.Whf(h_prev) + self.Wxf(x_t) + self.bf)
        o_t = torch.sigmoid(self.Who(h_prev) + self.Wxo(x_t) + self.bo)
        g_t = torch.tanh(self.Whg(h_prev) + self.Wxg(x_t) + self.bc)
        
        # Compute cell state and hidden state
        c_t = f_t * c_prev + i_t * g_t
        h_t = o_t * torch.tanh(g_t)
        
        return h_t, c_t

# Define the Seq2Seq model using VanillaLSTMCell
class LSTMSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, embed_dim, hidden_dim, n_layers):
        super(LSTMSeq2Seq, self).__init__()
        self.embed_dim = embed_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.encoder_embedding = nn.Embedding(input_dim, embed_dim)
        self.decoder_embedding = nn.Embedding(output_dim, embed_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

        # Encoder and Decoder use VanillaLSTMCell
        self.encoder_cells = nn.ModuleList([VanillaLSTMCell(embed_dim if i == 0 else hidden_dim, hidden_dim) for i in range(n_layers)])
        self.decoder_cells = nn.ModuleList([VanillaLSTMCell(embed_dim if i == 0 else hidden_dim, hidden_dim) for i in range(n_layers)])

    def forward(self, source, target):
        batch_size = source.size(0)
        seq_len = target.size(1)

        # Embedding for the encoder
        encoder_input = self.encoder_embedding(source)

        # Initialize states
        h, c = [torch.zeros(batch_size, self.hidden_dim).to(encoder_input.device) for _ in range(self.n_layers)], \
               [torch.zeros(batch_size, self.hidden_dim).to(encoder_input.device) for _ in range(self.n_layers)]

        # Encoder
        for t in range(source.size(1)):
            x_t = encoder_input[:, t, :]
            for i, cell in enumerate(self.encoder_cells):
                h[i], c[i] = cell(x_t, (h[i], c[i]))
                x_t = h[i]  # Pass the hidden state to the next layer

        # Initialize decoder states with encoder final states
        decoder_input = self.decoder_embedding(target)
        outputs = []
        for t in range(seq_len):
            x_t = decoder_input[:, t, :]
            for i, cell in enumerate(self.decoder_cells):
                h[i], c[i] = cell(x_t, (h[i], c[i]))
                x_t = h[i]
            outputs.append(self.fc(x_t))

        # Stack outputs
        outputs = torch.stack(outputs, dim=1)
        return outputs

# Define training parameters
input_dim = len(word_to_ix_eng)
output_dim = len(word_to_ix_dari)
embed_dim = 128
hidden_dim = 256
n_layers = 2
batch_size = 32
learning_rate = 0.001

# Dataset and DataLoader
train_dataset = TensorDataset(X_train_padded, Y_train_padded)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Model, Loss, and Optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"
model = LSTMSeq2Seq(input_dim, output_dim, embed_dim, hidden_dim, n_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_ix_dari['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training Loop
def train_model(model, train_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for source, target in train_loader:
            source, target = source.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(source, target[:, :-1])
            loss = criterion(output.reshape(-1, output_dim), target[:, 1:].reshape(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}")

train_model(model, train_loader, num_epochs=10)


Epoch 1, Loss: 8.576262982811516
Epoch 2, Loss: 7.838153232739666
Epoch 3, Loss: 7.26143274157066
Epoch 4, Loss: 6.389396898389801
Epoch 5, Loss: 5.409328004506629
Epoch 6, Loss: 4.463167216834121
Epoch 7, Loss: 3.639712684736477
Epoch 8, Loss: 2.960057603092644
Epoch 9, Loss: 2.449950614313441
Epoch 10, Loss: 2.0689985719252757


In [28]:
# Save the trained model
torch.save(model.state_dict(), "hardcoded_lstm_model.pth")

Training the Model

In [19]:
# Create DataLoader
batch_size = 32
train_dataset = TensorDataset(X_train_padded, Y_train_padded)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define hyperparameters
input_dim = len(word_to_ix_eng)
output_dim = len(word_to_ix_dari)
embed_dim = 256
hidden_dim = 512
n_layers = 2
learning_rate = 0.001
num_epochs = 20

# Define the LSTM-based Seq2Seq model
class LSTMSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, embed_dim, hidden_dim, n_layers):
        super(LSTMSeq2Seq, self).__init__()
        self.encoder_embedding = nn.Embedding(input_dim, embed_dim, padding_idx=0)
        self.decoder_embedding = nn.Embedding(output_dim, embed_dim, padding_idx=0)
        self.encoder_lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=n_layers, batch_first=True)
        self.decoder_lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # Encoder
        embedded_src = self.encoder_embedding(src)
        _, (hidden, cell) = self.encoder_lstm(embedded_src)
        
        # Decoder
        trg_len = trg.size(1)
        batch_size = trg.size(0)
        trg_vocab_size = self.fc.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(trg.device)
        
        decoder_input = trg[:, 0].unsqueeze(1)  # Start token
        for t in range(1, trg_len):
            embedded_trg = self.decoder_embedding(decoder_input)
            output, (hidden, cell) = self.decoder_lstm(embedded_trg, (hidden, cell))
            pred = self.fc(output.squeeze(1))
            outputs[:, t, :] = pred
            
            # Teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = pred.argmax(1)
            decoder_input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
        
        return outputs

# Initialize model, criterion, optimizer
model = LSTMSeq2Seq(input_dim, output_dim, embed_dim, hidden_dim, n_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_ix_dari['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training and validation
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    for src, trg in train_loader:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        
        # Reshape output for loss calculation
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        
        # Compute loss and update weights
        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss / len(train_loader):.4f}")

# Save the trained model
torch.save(model.state_dict(), "lstm_seq2seq_model.pth")

Epoch 1/20, Train Loss: 8.5023
Epoch 2/20, Train Loss: 7.9829
Epoch 3/20, Train Loss: 7.7891
Epoch 4/20, Train Loss: 7.6187
Epoch 5/20, Train Loss: 7.4422
Epoch 6/20, Train Loss: 7.2629
Epoch 7/20, Train Loss: 7.0694
Epoch 8/20, Train Loss: 6.8755
Epoch 9/20, Train Loss: 6.5985
Epoch 10/20, Train Loss: 6.1829
Epoch 11/20, Train Loss: 5.7548
Epoch 12/20, Train Loss: 5.2243
Epoch 13/20, Train Loss: 4.7434
Epoch 14/20, Train Loss: 4.1681
Epoch 15/20, Train Loss: 3.6505
Epoch 16/20, Train Loss: 3.2617
Epoch 17/20, Train Loss: 2.9764
Epoch 18/20, Train Loss: 2.7335
Epoch 19/20, Train Loss: 2.5964
Epoch 20/20, Train Loss: 2.4479


test

In [None]:
# Create DataLoader
batch_size = 32
train_dataset = TensorDataset(X_train_padded, Y_train_padded)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define hyperparameters
input_dim = len(word_to_ix_eng)
output_dim = len(word_to_ix_dari)
embed_dim = 256
hidden_dim = 512
n_layers = 2
learning_rate = 0.001
num_epochs = 20

In [None]:
# Define the LSTM-based Seq2Seq model
class LSTMSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, embed_dim, hidden_dim, n_layers):
        super(LSTMSeq2Seq, self).__init__()
        self.encoder_embedding = nn.Embedding(input_dim, embed_dim, padding_idx=0)
        self.decoder_embedding = nn.Embedding(output_dim, embed_dim, padding_idx=0)
        self.encoder_lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=n_layers, batch_first=True)
        self.decoder_lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # Encoder
        embedded_src = self.encoder_embedding(src)
        _, (hidden, cell) = self.encoder_lstm(embedded_src)
        
        # Decoder
        trg_len = trg.size(1)
        batch_size = trg.size(0)
        trg_vocab_size = self.fc.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(trg.device)
        
        decoder_input = trg[:, 0].unsqueeze(1)  # Start token
        for t in range(1, trg_len):
            embedded_trg = self.decoder_embedding(decoder_input)
            output, (hidden, cell) = self.decoder_lstm(embedded_trg, (hidden, cell))
            pred = self.fc(output.squeeze(1))
            outputs[:, t, :] = pred
            
            # Teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = pred.argmax(1)
            decoder_input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
        
        return outputs

# Initialize model, criterion, optimizer
model = LSTMSeq2Seq(input_dim, output_dim, embed_dim, hidden_dim, n_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_ix_dari['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training and validation
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    for src, trg in train_loader:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        
        # Reshape output for loss calculation
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        
        # Compute loss and update weights
        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss / len(train_loader):.4f}")

# Save the trained model
torch.save(model.state_dict(), "lstm_seq2seq_model.pth")