In [9]:
import torch
from torch.utils.data import DataLoader
import time

In [10]:
MAX_LEN = 50
input_size = 50
hidden_size = 50
num_classes = 26
learning_rate = 0.001
batch_size = 10
num_epochs = 200
caesar_stride = 3

In [11]:
INDEX_TO_CHAR = sorted(list(set(' abcdefghijklmnopqrstuvwxyz')))
CHAR_TO_INDEX = {c: i for i, c in enumerate(INDEX_TO_CHAR)}

In [12]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [13]:
X = open('../data/caesar/encoded_text.txt', 'r').read().split(sep='\n')
y = open('../data/caesar/row_text.txt', 'r').read().split(sep='\n')

In [14]:
X_val, y_val = X[:len(X) // 5], y[:len(y) // 5]
X_test, y_test = X[len(X) // 5: (len(X) // 5) * 2], y[len(y) // 5: (len(X) // 5) * 2]
X_train, y_train = X[(len(X) // 5) * 2:], y[(len(X) // 5) * 2:]

In [15]:
def to_tensor(X, y):
    X_tens = torch.zeros(len(X), MAX_LEN, dtype=torch.long)
    Y_tens = torch.zeros(len(y), MAX_LEN, dtype=torch.long)
    for i in range(len(X)):
        for j in range(len(X[i])):
            X_tens[i, j] = CHAR_TO_INDEX[X[i][j]]
            Y_tens[i, j] = CHAR_TO_INDEX[y[i][j]]
    return X_tens, Y_tens

In [16]:
val_X, val_Y = to_tensor(X_val, y_val)
test_X, test_Y = to_tensor(X_test, y_test)
train_X, train_Y = to_tensor(X_train, y_train)

In [17]:
train_dl = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(train_X, train_Y),
    batch_size=batch_size,
    shuffle=True,
    drop_last=True
)
test_dl = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(test_X, test_Y),
    batch_size=batch_size,
    shuffle=True,
    drop_last=True
)
val_dl = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(val_X, val_Y),
    batch_size=batch_size,
    shuffle=True,
    drop_last=True
)

In [18]:
class RNNModel(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.embed = torch.nn.Embedding(len(CHAR_TO_INDEX) + caesar_stride, 32)
        self.rnn = torch.nn.RNN(32, 128, batch_first=True)
        self.linear = torch.nn.Linear(128, len(CHAR_TO_INDEX) + caesar_stride)

    def forward(self, sentence, state=None):
        embed = self.embed(sentence)
        o, h = self.rnn(embed)
        return self.linear(o)

In [19]:
model = RNNModel().to(device)
loss = torch.nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [20]:
for epoch in range(num_epochs):
    train_loss, train_acc, iter_num = .0, .0, .0
    start_epoch_time = time.time()
    model.train()
    for x_in, y_in in train_dl:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        optimizer.zero_grad()
        out = model.forward(x_in).view(-1, len(CHAR_TO_INDEX) + caesar_stride)
        l = loss(out, y_in)
        train_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        train_acc += batch_acc.sum().item() / batch_acc.shape[0]
        l.backward()
        optimizer.step()
        iter_num += 1
    print(
        f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: "
        f"{train_acc / iter_num:.4f}",
        end=" | "
    )
    val_loss, val_acc, iter_num = .0, .0, .0
    model.eval()
    for x_in, y_in in val_dl:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        out = model.forward(x_in).view(-1, len(CHAR_TO_INDEX) + caesar_stride)
        l = loss(out, y_in)
        val_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        val_acc += batch_acc.sum().item() / batch_acc.shape[0]
        iter_num += 1
    print(
        f"val loss: {val_loss:.4f}, val acc: {val_acc / iter_num:.4f} | "
        f"{time.time() - start_epoch_time:.2f} sec."
    )

Epoch: 0, loss: 12.6823, acc: 0.3060 | val loss: 2.9592, val acc: 0.4020 | 0.25 sec.
Epoch: 1, loss: 10.6728, acc: 0.4590 | val loss: 2.5028, val acc: 0.3540 | 0.41 sec.
Epoch: 2, loss: 9.1326, acc: 0.3865 | val loss: 2.3100, val acc: 0.3280 | 0.53 sec.
Epoch: 3, loss: 8.3741, acc: 0.3925 | val loss: 2.1017, val acc: 0.4300 | 0.45 sec.
Epoch: 4, loss: 7.4099, acc: 0.5725 | val loss: 1.9103, val acc: 0.6080 | 0.35 sec.
Epoch: 5, loss: 6.7534, acc: 0.6955 | val loss: 1.7102, val acc: 0.6760 | 0.25 sec.
Epoch: 6, loss: 6.0458, acc: 0.7460 | val loss: 1.5156, val acc: 0.7280 | 0.24 sec.
Epoch: 7, loss: 5.2766, acc: 0.7940 | val loss: 1.3566, val acc: 0.7920 | 0.24 sec.
Epoch: 8, loss: 4.8429, acc: 0.8375 | val loss: 1.2263, val acc: 0.8440 | 0.24 sec.
Epoch: 9, loss: 4.1573, acc: 0.8845 | val loss: 1.1216, val acc: 0.8540 | 0.34 sec.
Epoch: 10, loss: 3.6749, acc: 0.9010 | val loss: 0.9338, val acc: 0.8860 | 0.31 sec.
Epoch: 11, loss: 3.1580, acc: 0.9195 | val loss: 0.8826, val acc: 0.8880 

In [25]:
test_loss, test_acc, iter_num = .0, .0, .0
model.eval()
for x_in, y_in in test_dl:
    x_in = x_in
    y_in = y_in.view(1, -1).squeeze()
    out = model.forward(x_in).view(-1, len(CHAR_TO_INDEX) + caesar_stride)
    l = loss(out, y_in)
    test_loss += l.item()
    batch_acc = (out.argmax(dim=1) == y_in)
    test_acc += batch_acc.sum().item() / batch_acc.shape[0]
    iter_num += 1
print(
    f"test loss: {test_loss:.4f}, test acc: {test_acc / iter_num:.4f}")

test loss: 0.0020, test acc: 1.0000
