In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import random
import math

Loading Data

In [3]:
with open("train.sources") as f:
    train_sources = f.readlines()
with open("train.targets") as f:
    train_targets = f.readlines()

train_sources_tokenized = []
for i in range(len(train_sources)):
    train_sources_tokenized.append(train_sources[i].replace("\n", "").split(" "))

train_targets_tokenized = []
for i in range(len(train_targets)):
    train_targets_tokenized.append(train_targets[i].replace("\n", "").split(" "))


train_sources_vocabulary = []
for i in range(len(train_sources_tokenized)):
    for j in range(len(train_sources_tokenized[i])):
        if train_sources_tokenized[i][j] not in train_sources_vocabulary:
            train_sources_vocabulary.append(train_sources_tokenized[i][j])

train_targets_vocabulary = []
for i in range(len(train_targets_tokenized)):
    for j in range(len(train_targets_tokenized[i])):
        if train_targets_tokenized[i][j] not in train_targets_vocabulary:
            train_targets_vocabulary.append(train_targets_tokenized[i][j])

In [4]:
MAX_SEQUENCE_LENGTH = 500  
train_sources_sequences = np.zeros((len(train_sources_tokenized), MAX_SEQUENCE_LENGTH))
train_targets_sequences = np.zeros((len(train_targets_tokenized), MAX_SEQUENCE_LENGTH))

for i in range(len(train_sources_tokenized)):
    for j in range(min(MAX_SEQUENCE_LENGTH, len(train_sources_tokenized[i]))):
        token = train_sources_tokenized[i][j]
        if token in train_sources_vocabulary:
            train_sources_sequences[i, j] = train_sources_vocabulary.index(token)
        else:
            train_sources_sequences[i, j] = train_sources_vocabulary.index('<OOV>')

for i in range(len(train_targets_tokenized)):
    for j in range(min(MAX_SEQUENCE_LENGTH, len(train_targets_tokenized[i]))):
        token = train_targets_tokenized[i][j]
        if token in train_targets_vocabulary:
            train_targets_sequences[i, j] = train_targets_vocabulary.index(token)
        else:
            train_targets_sequences[i, j] = train_targets_vocabulary.index('<OOV>')

X = torch.from_numpy(train_sources_sequences).long()
Y = torch.from_numpy(train_targets_sequences).long()

In [14]:
class Encoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.embedding = torch.nn.Embedding(input_size, embedding_size)
        self.lstm = torch.nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, bidirectional=True)

    def forward(self, x):
        embedded = self.embedding(x) 
        outputs, (hidden, cell) = self.lstm(embedded)

        hidden = (hidden[::2, :, :] + hidden[1::2, :, :]) / 2
        cell = (cell[::2, :, :] + cell[1::2, :, :]) / 2

        return outputs, hidden, cell
    
class Attention(torch.nn.Module):
    def __init__(self, hid_dim):
        super(Attention, self).__init__()
        self.hid_dim = hid_dim
        self.v = torch.nn.Parameter(torch.rand(hid_dim))
        self.v.data.normal_(mean=0, std=1. / np.sqrt(self.v.size(0)))

    def forward(self, hidden, encoder_outputs):
        seq_length = encoder_outputs.shape[0]
        hidden_size_adjusted = encoder_outputs.shape[2] // 2
        hidden = hidden.unsqueeze(1).repeat(1, seq_length, hidden_size_adjusted)

        attention_scores = torch.tanh(torch.sum(hidden * encoder_outputs, dim=2))  # (batch_size, seq_length)
        attention_weights = F.softmax(attention_scores, dim=1).unsqueeze(1)  # (batch_size, 1, seq_length)
        return attention_weights

    
class Decoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        super(Decoder, self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout

        self.embedding = torch.nn.Embedding(input_size, embedding_size)
        self.lstm = torch.nn.LSTM(embedding_size+hidden_size*2, hidden_size, num_layers, dropout=dropout)
        self.attention = Attention(hidden_size)
        self.fc = torch.nn.Linear(hidden_size, input_size)
        
    def forward(self, x, hidden, cell, encoder_outputs):
        x = x.unsqueeze(0)  # (batch_size) -> (1, batch_size)
        embedded = self.embedding(x)  # (1, batch_size, embedding_size)

        attention = self.attention(hidden[-1], encoder_outputs)  # (batch_size, 1, seq_length)
        encoder_outputs = encoder_outputs.transpose(0, 1)  # (seq_length, batch_size, hidden_size*2) -> (batch_size, seq_length, hidden_size*2)
        weights = torch.bmm(attention, encoder_outputs)  # (batch_size, 1, hidden_size*2)
        weights = weights.transpose(0, 1)  # (1, batch_size, hidden_size*2)

        dec_input = torch.cat((embedded, weights), dim=2)  # (1, batch_size, embedding_size+hidden_size*2)
        output, (hidden, cell) = self.lstm(dec_input)  # (1, batch_size, hidden_size) ,(num_layers, batch_size, hidden_size), (num_layers, batch_size, hidden_size)
        prediction = self.fc(output.squeeze(0))  # (batch_size, input_size)
        return prediction, hidden, cell # (batch_size, input_size)
    
class Seq2Seq(torch.nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.shape[1]    # (seq_length, batch_size)
        seq_length = target.shape[0]    # (seq_length, batch_size)
        input_size = self.decoder.input_size

        outputs = torch.zeros(seq_length, batch_size, input_size).to(self.device)  
        encoder_outputs, hidden, cell = self.encoder(source)  # (seq_length, batch_size, hidden_size*2)
        batch = target[0]  

        for i in range(1, seq_length):
            output, hidden, cell = self.decoder(batch, hidden, cell, encoder_outputs)  # (batch_size, input_size), (num_layers, batch_size, hidden_size), (num_layers, batch_size, hidden_size)
            outputs[i] = output
            batch = target[i] if random.random() < teacher_forcing_ratio else output.argmax(1)

        return outputs

In [6]:
def train(model, iterator, optimizer, criterion, clip, device):
    model.train()
    epoch_loss = 0
    
    for i, batch in enumerate(iterator):
        source = batch[0].to(device)
        target = batch[1].to(device)

        optimizer.zero_grad()
        output = model(source, target, 0.5)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        target = target[1:].view(-1)
        loss = criterion(output, target)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

In [15]:
INPUT_ENC_DIM = len(train_sources_vocabulary)
INPUT_DEC_DIM = len(train_targets_vocabulary)
OUTPUT_DIM = len(train_targets_vocabulary)
HIDDEN_DIM = 512
EMBEDDING_DIM = 512
NUM_LAYERS = 2
DROPOUT = 0.5
BATCH_SIZE = 32
NUM_STEPS = 20000
CLIP = 1
LEARNING_RATE = 1e-4
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_dataset = torch.utils.data.TensorDataset(X, Y)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
NUM_EPOCHS = math.ceil(NUM_STEPS / len(train_dataloader))

encoder = Encoder(INPUT_ENC_DIM, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(INPUT_DEC_DIM, OUTPUT_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=502)

for i in range(10):
    train_loss = train(model, train_dataloader, optimizer, criterion, CLIP, DEVICE)
    print('Epoch: {}, Train Loss: {}'.format(i, train_loss))

: 