In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.utils.data import TensorDataset, DataLoader
import random
from sklearn.model_selection import train_test_split

In [None]:
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

def tokenize_and_build_vocab(data, max_sequence_length):
    tokenized_data = [line.strip().split(" ") for line in data]
    vocabulary = ['<OOV>'] + list(set(token for line in tokenized_data for token in line))
    integer_sequences_padded = np.zeros((len(tokenized_data), max_sequence_length))

    for i in range(len(tokenized_data)):
        for j in range(min(max_sequence_length, len(tokenized_data[i]))):
            token = tokenized_data[i][j]
            integer_sequences_padded[i, j] = vocabulary.index(token) if token in vocabulary else vocabulary.index('<OOV>')

    return torch.from_numpy(integer_sequences_padded).long(), vocabulary

with open("train.sources") as f:
    train_sources = f.readlines()

with open("train.targets") as f:
    train_targets = f.readlines()

MAX_SEQUENCE_LENGTH = 500

X_train, train_sources_vocabulary = tokenize_and_build_vocab(train_sources, MAX_SEQUENCE_LENGTH)
Y_train, train_targets_vocabulary = tokenize_and_build_vocab(train_targets, MAX_SEQUENCE_LENGTH)

X_train, X_test, Y_train, Y_test = train_test_split(X_train, Y_train, test_size=0.2)

train_data = TensorDataset(X_train, Y_train)
test_data = TensorDataset(X_test, Y_test)

train_loader = DataLoader(train_data, shuffle=True, batch_size=32)
test_loader = DataLoader(test_data, shuffle=True, batch_size=32)


In [None]:

class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, bidirectional=True)

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)

        # Combine bidirectional outputs
        hidden = (hidden[::2, :, :] + hidden[1::2, :, :]) / 2
        cell = (cell[::2, :, :] + cell[1::2, :, :]) / 2

        return outputs, hidden, cell

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attention = nn.Linear(hidden_size * 3, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))
        nn.init.normal_(self.v.data, mean=0, std=1. / np.sqrt(self.v.size(0)))

    def forward(self, hidden, encoder_outputs):
        seq_length = encoder_outputs.shape[0]
        hidden = hidden.repeat(seq_length, 1, 1).transpose(0, 1)
        encoder_outputs = encoder_outputs.transpose(0, 1)
        energy = torch.tanh(self.attention(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = F.softmax(torch.sum(self.v * energy, dim=2), dim=1).unsqueeze(1)
        return attention

class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size + hidden_size * 2, hidden_size, num_layers, dropout=dropout)
        self.attention = Attention(hidden_size)
        self.fc = nn.Linear(hidden_size, input_size)

    def forward(self, x, hidden, cell, encoder_outputs):
        x = x.unsqueeze(0)
        embedded = self.embedding(x)
        attention = self.attention(hidden[-1], encoder_outputs)
        encoder_outputs = encoder_outputs.transpose(0, 1)
        weighted = torch.bmm(attention, encoder_outputs)
        weighted = weighted.transpose(0, 1)
        output, (hidden, cell) = self.lstm(torch.cat((embedded, weighted), dim=2), (hidden, cell))
        prediction = self.fc(output.squeeze(0))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.shape[1]
        seq_length = target.shape[0]
        input_size = self.decoder.fc.out_features
        outputs = torch.zeros(seq_length, batch_size, input_size).to(self.device)
        encoder_outputs, hidden, cell = self.encoder(source)
        x = target[0]
        for i in range(1, seq_length):
            output, hidden, cell = self.decoder(x, hidden, cell, encoder_outputs)
            outputs[i] = output
            best_guess = output.argmax(1)
            x = target[i] if random.random() < teacher_forcing_ratio else best_guess
        return outputs


In [None]:

num_epochs = 1
clip = 1
for epoch in range(num_epochs):
    epoch_loss = 0
    for i, batch in enumerate(iterator):
        source = batch[0].to(device)
        target = batch[1].to(device)
        optimizer.zero_grad()
        output = model(source, target)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        target = target[1:].view(-1)
        loss = criterion(output, target)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    print("Epoch:",epoch+1,"Loss:",epoch_loss/len(iterator)