In [2]:
import sys
import math
import torch
import warnings
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim

from prettytable import PrettyTable
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader

warnings.filterwarnings('ignore')
np.set_printoptions(threshold=sys.maxsize)

In [3]:
# Load train data
train_data = pd.read_csv('A3 files/train_data.csv')
X = train_data['Sentence']
Y = train_data['Transformed sentence'].apply(lambda x: '<'+x)

ch2_idx = {char: idx for idx, char in enumerate(set(char for string in X for char in string))}
ch2_idx.update({'<': len(ch2_idx)})

X = [[ch2_idx[char] for char in string] for string in X]
Y = [[ch2_idx[char] for char in string] for string in Y]

x_train, x_valid, y_train, y_valid = train_test_split(X, Y, train_size=0.8)
trainset = TensorDataset(torch.LongTensor(x_train), torch.LongTensor(y_train))
valset = TensorDataset(torch.LongTensor(x_valid), torch.LongTensor(y_valid))
max_len = max(len(i) for i in x_train)

trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
valloader = DataLoader(valset, batch_size=64, shuffle=False)

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, nhead):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.d_k = d_model // nhead

        self.W_q = nn.Linear(d_model ,d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
    
    def split_heads(self, x):
        batch_size, seq_length, _ = x.size()
        return x.view(batch_size, seq_length, self.nhead, self.d_k).transpose(1, 2)
    
    def combine_heads(self, x):
        batch_size, _, seq_length, _ = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
    
    def forward(self, Q, K, V):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        attn_output = self.scaled_dot_product_attention(Q, K, V)
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [5]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super(PositionalEncoding, self).__init__()

        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        encoding = torch.zeros(max_len, d_model)

        encoding[:, 0::2] = torch.sin(position * div_term)
        encoding[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('encoding', encoding.unsqueeze(0))
    
    def forward(self, x):
        x = x + self.encoding[:, :x.size(1)]
        return x

In [6]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff, rate):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(rate)

    def forward(self, x):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x
    
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, d_ff, rate):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead)
        self.cross_attn = MultiHeadAttention(d_model, nhead)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(rate)

    def forward(self, x, enc_output):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [7]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_layers, d_ff, max_len, rate):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.encoding = PositionalEncoding(d_model, max_len)
        
        self.encoder = nn.ModuleList([EncoderLayer(d_model, nhead, d_ff, rate) for _ in range(num_layers)])
        self.decoder = nn.ModuleList([DecoderLayer(d_model, nhead, d_ff, rate) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(rate)

    def forward(self, src, tgt):
        x = self.dropout(self.encoding(self.embedding(src)))
        y = self.dropout(self.encoding(self.embedding(tgt)))
        
        enc_output = x
        for en_layer in self.encoder:
            enc_output = en_layer(enc_output)

        dec_output = y
        for dec_layer in self.decoder:
            dec_output = dec_layer(dec_output, enc_output)

        output = self.fc(dec_output)
        return output

In [8]:
def train(model, n_epochs, loss_func, optimizer, vocab_size, trainloader, valloader):
    for _ in range(n_epochs):
        model.train()
        for (src, tgt) in trainloader:
            optimizer.zero_grad()
            output = model(src, tgt[:, :-1])
            loss = loss_func(output.contiguous().view(-1, vocab_size), tgt[:,1:].contiguous().view(-1))
            loss.backward()
            optimizer.step()
        t_loss, t_acc = evaluate(model, loss_func, vocab_size, trainloader)
        v_loss, v_acc = evaluate(model, loss_func, vocab_size, valloader)
        print('Epoch: {}/{} Train [Loss: {:.4f} Accuracy: {:.4f}] Validation [Loss: {:.4f} Accuracy: {:.4f}]'.format(1+_, n_epochs, t_loss, t_acc, v_loss, v_acc))

def evaluate(model, loss_func, vocab_size, dataloader):
    model.eval()
    with torch.no_grad():
        t_loss, correct, total = 0, 0, 0
        for (src, tgt) in dataloader:
            output = model(src, tgt[:, :-1])
            loss = loss_func(output.contiguous().view(-1, vocab_size), tgt[:,1:].contiguous().view(-1))
            t_loss += loss.item()
            _, predicted = torch.max(output.data, dim=2)
            correct += (predicted == tgt[:,1:]).sum().item()
            total += predicted.shape[0]
    avg_loss = t_loss / len(dataloader)
    accuracy = correct / (total * (tgt.size(1)-1))
    return avg_loss, accuracy

In [9]:
nhead = 8
d_ff = 2048 # dimension of inner fully connected layer
d_model = 512 # dimension of model sub-layers' outputs
num_layers = 6
vocab_size = 28
dropout_rate = 0.2
learn_rate = 0.0001

model = Transformer(vocab_size, d_model, nhead, num_layers, d_ff, max_len, dropout_rate)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learn_rate)

n_epochs = 10
train(model, n_epochs, loss_func, optimizer, vocab_size, trainloader, valloader)

Epoch: 1/10 Train [Loss: 1.3399 Accuracy: 0.5853] Validation [Loss: 1.3659 Accuracy: 0.5710]
Epoch: 2/10 Train [Loss: 0.2870 Accuracy: 0.9119] Validation [Loss: 0.3017 Accuracy: 0.9052]
Epoch: 3/10 Train [Loss: 0.1092 Accuracy: 0.9452] Validation [Loss: 0.1261 Accuracy: 0.9357]
Epoch: 4/10 Train [Loss: 0.0960 Accuracy: 0.9466] Validation [Loss: 0.1048 Accuracy: 0.9396]
Epoch: 5/10 Train [Loss: 0.0928 Accuracy: 0.9463] Validation [Loss: 0.1031 Accuracy: 0.9375]
Epoch: 6/10 Train [Loss: 0.0848 Accuracy: 0.9499] Validation [Loss: 0.0985 Accuracy: 0.9351]
Epoch: 7/10 Train [Loss: 0.0845 Accuracy: 0.9491] Validation [Loss: 0.0954 Accuracy: 0.9376]
Epoch: 8/10 Train [Loss: 0.0905 Accuracy: 0.9477] Validation [Loss: 0.1046 Accuracy: 0.9376]
Epoch: 9/10 Train [Loss: 0.0819 Accuracy: 0.9537] Validation [Loss: 0.0969 Accuracy: 0.9360]
Epoch: 10/10 Train [Loss: 0.0823 Accuracy: 0.9533] Validation [Loss: 0.1006 Accuracy: 0.9372]


In [10]:
print(evaluate(model, loss_func, vocab_size, trainloader))
print(evaluate(model, loss_func, vocab_size, valloader))

(0.08244324340061708, 0.9533482142857143)
(0.10064782947301865, 0.9372321428571428)


In [15]:
def check(pred, true):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1
    return correct

def predict(model, idx2_ch, ch2_idx, dataloader):
    model.eval()
    y_pred = []
    with torch.no_grad():
        for (src, _) in dataloader:
            max_len = src.size(1)
            pred = torch.zeros(1, max_len, dtype=torch.long)
            pred[0][0] = ch2_idx['<']
            for i in range(max_len):
                output = model(src, pred)
                output = torch.argmax(output, dim=-1).tolist()[0]
                pred[0, i] = output[i]
                # print(output, pred)
            output = pred[0].tolist()
            result = [idx2_ch[idx] for idx in output]
            # print(result)
            y_pred.append(''.join(result))
    return y_pred

In [17]:
# Load test data
test_data = pd.read_csv('A3 files/eval_data.csv')
X = test_data['Sentence']
Y = test_data['Transformed sentence'].apply(lambda x: '<'+x)

ch2_idx = {char: idx for idx, char in enumerate(set(char for string in X for char in string))}
ch2_idx.update({'<': len(ch2_idx)})
idx2_ch = {idx: char for char, idx in ch2_idx.items()}

x_test = [[ch2_idx[char] for char in string] for string in X]
y_test = [[ch2_idx[char] for char in string] for string in Y]

testset = TensorDataset(torch.LongTensor(x_test), torch.LongTensor(y_test))
testloader = DataLoader(testset, batch_size=1, shuffle=False)
print(evaluate(model, loss_func, vocab_size, testloader))
predicted = predict(model, idx2_ch, ch2_idx, testloader)

freq = {}
for i in range(max_len+1):
    freq[i] = 0

# print('Original\tPredicted')
for (y_true, y_pred) in zip(Y, predicted):
    num = check(y_true[1:], y_pred)
    freq[num] += 1
    # print('{}\t{}\t{}'.format(y_true[1:], y_pred, num))

t = PrettyTable(['Length', '#Correct'])
for i in range(max_len+1):
    t.add_row([i, freq[i]])
print(t)

(0.5260695640041958, 0.909125)
+--------+----------+
| Length | #Correct |
+--------+----------+
|   0    |   996    |
|   1    |   816    |
|   2    |   169    |
|   3    |    16    |
|   4    |    3     |
|   5    |    0     |
|   6    |    0     |
|   7    |    0     |
|   8    |    0     |
+--------+----------+


In [20]:
Y = train_data['Transformed sentence'].apply(lambda x: '<'+x)
trainloader = DataLoader(trainset, batch_size=1, shuffle=True)
predicted = predict(model, idx2_ch, ch2_idx, trainloader)

freq = {}
for i in range(max_len+1):
    freq[i] = 0

# print('Original\tPredicted')
for (y_true, y_pred) in zip(Y, predicted):
    num = check(y_true[1:], y_pred)
    freq[num] += 1
    # print('{}\t{}\t{}'.format(y_true[1:], y_pred, num))

t = PrettyTable(['Length', '#Correct'])
for i in range(max_len+1):
    t.add_row([i, freq[i]])
print(t)

+--------+----------+
| Length | #Correct |
+--------+----------+
|   0    |   4088   |
|   1    |   1320   |
|   2    |   180    |
|   3    |    11    |
|   4    |    1     |
|   5    |    0     |
|   6    |    0     |
|   7    |    0     |
|   8    |    0     |
+--------+----------+
