In [1]:
import sys
import json
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim

from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import TensorDataset, DataLoader

np.set_printoptions(threshold=sys.maxsize)

In [2]:
def load_data(path):
    data = []
    with open(path, 'r') as file:
        for line in file:
            seq = json.loads(line)
            data.append(seq)
    return data

def get_input_vocab(data):
    char_vocab = set()
    for seq in data:
        for _, value in seq[0].items():
            char_vocab.update(str(value))
    return char_vocab

def get_output_vocab(data):
    char_vocab = set()
    for obj in data:
        char_vocab.update(json.dumps(obj))
    return char_vocab

def get_input_idx(data, ch2idx):
    result = []
    for seq in data:
        indices = []
        for _, value in seq[0].items():
            indices.extend([ch2idx[char] for char in str(value)])
        result.append(indices)
    return result

def get_output_idx(data, ch2idx):
    result = []
    for obj in data:
        indices = [ch2idx[char] for char in json.dumps(obj)]
        result.append(indices)
    return result

In [3]:
# Load all the data
train_path = ['A3 files/train.sources', 'A3 files/train.targets']
val_path = ['A3 files/dev.sources', 'A3 files/dev.targets']
test_path = ['A3 files/test.sources', 'A3 files/test.targets']

X, Y = load_data(train_path[0]), load_data(train_path[1])
x_valid, y_valid = load_data(val_path[0], val_path[1])
x_test, y_test = load_data(test_path[0], test_path[1])

In [4]:
# Data preprocessing
vocab_input = get_input_vocab(X)
vocab_output = get_output_vocab(Y)

special_tokens = ['<sos>', '<pad>', '<eos>']
vocab_input.update(special_tokens)
vocab_output.update(special_tokens)

ch2idx_input = {char: idx for idx, char in enumerate(vocab_input)}
idx2ch_input = {idx: char for char, idx in ch2idx_input.items()}

ch2idx_output = {char: idx for idx, char in enumerate(vocab_output)}
idx2ch_output = {idx: char for char, idx in ch2idx_output.items()}

x_train = get_input_idx(X, ch2idx_input)
y_train = get_output_idx(Y, ch2idx_output)
print(len(x_train))
print(len(y_train))

x_train = pad_sequence([torch.tensor(seq) for seq in x_train], batch_first=True, padding_value=ch2idx_input['<pad>'])
y_train = pad_sequence([torch.tensor(seq) for seq in y_train], batch_first=True, padding_value=ch2idx_output['<pad>'])

trainset = TensorDataset(x_train, y_train)
trainloader = DataLoader(trainset, batch_size=32, shuffle=True)

print(len(ch2idx_input))
print(len(ch2idx_output))

172719
172719
97
46


In [None]:
x_test = get_input_idx(X, ch2idx_input)
y_test = get_output_idx(Y, ch2idx_output)

x_test = pad_sequence([torch.tensor(seq) for seq in x_test], batch_first=True, padding_value=ch2idx_input['<pad>'])
y_test = pad_sequence([torch.tensor(seq) for seq in y_test], batch_first=True, padding_value=ch2idx_output['<pad>'])

testset = TensorDataset(x_test, y_test)
testloader = DataLoader(testset, batch_size=32, shuffle=True)

In [5]:
# Define all the classes
class Encoder(nn.Module):
    def __init__(self, embed_size, input_size, hidden_size, num_layers, rate):
        super(Encoder, self).__init__()
        self.dropout = nn.Dropout(rate)
        self.embedding = nn.Embedding(input_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, bidirectional=True)
        self.fc_hidden = nn.Linear(hidden_size*2, hidden_size)
        self.fc_cell = nn.Linear(hidden_size*2, hidden_size)

    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        en_states, (hidden, cell) = self.lstm(embedded)
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))
        return en_states, hidden, cell
    
class Decoder(nn.Module):
    def __init__(self, embed_size, output_size, hidden_size, num_layers, rate):
        super(Decoder, self).__init__()
        self.output_size = output_size
        self.dropout = nn.Dropout(rate)
        self.embedding = nn.Embedding(output_size, embed_size)
        self.lstm = nn.LSTM(hidden_size*2 + embed_size, hidden_size, num_layers)
        self.energy = nn.Linear(hidden_size*3, 1)
        self.softmax = nn.Softmax(dim=0)
        self.relu = nn.ReLU()
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, en_states, hidden, cell):
        x = x.unsqueeze(0)
        embedded = self.dropout(self.embedding(x))
        sequence_length = en_states.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)
        energy = self.relu(self.energy(torch.cat((h_reshaped, en_states), dim=2)))
        attention = self.softmax(energy)
        attention = attention.permute(1, 2, 0)
        en_states = en_states.permute(1, 0, 2)
        context_vector = torch.bmm(attention, en_states).permute(1, 0, 2)
        de_input = torch.cat((context_vector, embedded), dim=2)
        outputs, (hidden, cell) = self.lstm(de_input, (hidden, cell))
        prediction = self.fc(outputs).squeeze(0)
        return prediction, hidden, cell
    
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        tgt_vocab = self.decoder.output_size
        outputs = torch.zeros(target_len, batch_size, tgt_vocab)
        en_states, hidden, cell = self.encoder(source)
        x = target[0]
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, en_states, hidden, cell)
            outputs[t] = output
            x = output.argmax(1)
        return outputs

In [1]:
input_vocab_size = len(ch2idx_input) + 1
output_vocab_size = len(ch2idx_output) + 1
hidden_size = 512
embed_size = 512
num_layers = 1
dropout_rate = 0.5
learn_rate = 0.0001
batch_size = 32
num_epochs = 10

encoder_net = Encoder(embed_size, input_vocab_size, hidden_size, num_layers, dropout_rate)
decoder_net = Decoder(embed_size, output_vocab_size, hidden_size, num_layers, dropout_rate)

model = Seq2Seq(encoder_net, decoder_net)
optimizer = optim.Adam(model.parameters(), lr=learn_rate)
loss_func = nn.CrossEntropyLoss(ignore_index=ch2idx_output['<eos>'])

for _ in range(num_epochs):
    t_loss = 0
    for (input, target) in trainloader:
        input = input.permute(1, 0)
        target = target.permute(1, 0)
        optimizer.zero_grad()
        output = model(input, target)
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)
        loss = loss_func(output, target)
        t_loss += loss.item()
        loss.backward()
        optimizer.step()
    print('Steps: {}/{} Loss: {:.4f}'.format(1+_, num_epochs, t_loss/len(trainloader)))

Epoch 1/10, Loss: 0.0813
Epoch 2/10, Loss: 0.0653
Epoch 3/10, Loss: 0.0549
Epoch 4/10, Loss: 0.0446
Epoch 5/10, Loss: 0.0369
Epoch 6/10, Loss: 0.0305
Epoch 7/10, Loss: 0.0259
Epoch 8/10, Loss: 0.0220
Epoch 9/10, Loss: 0.0186
Epoch 10/10, Loss: 0.0159


In [None]:
def log_perplexity(model, dataloader):
    log_ppx = 0
    for (input, target) in dataloader:
        input = input.permute(1, 0)
        target = target.permute(1, 0)
        output = model(input, target)
        output = output.reshape(-1, output.shape[2])
        target = target.reshape(-1)
        log_ppx += loss_func(output, target).item()
    return log_ppx / len(dataloader)

In [1]:
log_perplexity(model, testloader)

0.073368463047935463
