In [1]:
import re
import time
import torch
from nltk import tokenize

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
FILE_NAME = './data/nietzsche.txt'
CAESAR_OFFSET = 2
CHARS = list('abcdefghijklmnopqrstuvwxyz ')
INDEX_TO_CHAR = [w for w in sorted(CHARS)]
CHAR_TO_INDEX = {w: i for i, w in enumerate(INDEX_TO_CHAR)}
BATCH_SIZE = 512
NUM_EPOCHS = 400

In [3]:
def load_and_vanilla_preprocess(txt_path):
    with open(txt_path, encoding='utf-8') as txt_file:
        text = txt_file.read().lower()
    text = text.replace('\n', ' ')
    sentences = tokenize.sent_tokenize(text)
    sentences = [s for s in sentences if len(s) < 1000]
    sentences = [re.sub('[^a-z ]', ' ', s) for s in sentences]
    return sentences

def caesor(s, shift):
    res = ''
    for c in s:
        if c not in CHARS:
            res += ' '
        else:
            res += CHARS[(CHARS.index(c.lower()) + shift) % len(CHARS)]
    return res

text = load_and_vanilla_preprocess(FILE_NAME)

In [4]:
def vectorize(text):
    max_length = len(max(text, key=len))
    X = torch.zeros((len(text), max_length), dtype=int)
    Y = torch.zeros((len(text), max_length), dtype=int)

    for i in range(len(text)):
        for j, w in enumerate(text[i]):
            X[i, j] = CHAR_TO_INDEX.get(caesor(w, CAESAR_OFFSET), CHAR_TO_INDEX[' '])
            Y[i, j] = CHAR_TO_INDEX.get(w, CHAR_TO_INDEX[' '])
    return X, Y

X, Y = vectorize(text)

dataset = torch.utils.data.TensorDataset(X, Y)

In [5]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_dl = torch.utils.data.DataLoader(train_dataset, BATCH_SIZE, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_dataset, BATCH_SIZE, shuffle=True)

In [6]:
class RNNModel(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.embed = torch.nn.Embedding(len(CHARS), 32)
        self.rnn = torch.nn.RNN(32, 128, batch_first=True)
        self.linear = torch.nn.Linear(128, len(CHARS))

    def forward(self, sentence, state=None):
        embed = self.embed(sentence)
        o, h = self.rnn(embed)
        return self.linear(o)

In [7]:
model = RNNModel()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=.05)

In [8]:
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc, iter_num = .0, .0, .0
    start_epoch_time = time.time()
    model.train()
    for x_in, y_in in train_dl:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        optimizer.zero_grad()
        out = model.forward(x_in).view(-1, len(CHARS))
        l = criterion(out, y_in)
        train_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        train_acc += batch_acc.sum().item() / batch_acc.shape[0]
        l.backward()
        optimizer.step()
        iter_num += 1
    print(
        f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: "
        f"{train_acc / iter_num:.4f}",
        end=" | "
    )
    test_loss, test_acc, iter_num = .0, .0, .0
    model.eval()
    for x_in, y_in in test_dl:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        out = model.forward(x_in).view(-1, len(CHARS))
        l = criterion(out, y_in)
        test_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        test_acc += batch_acc.sum().item() / batch_acc.shape[0]
        iter_num += 1
    print(
        f"test loss: {test_loss:.4f}, test acc: {test_acc / iter_num:.4f} | "
        f"{time.time() - start_epoch_time:.2f} sec."
    )

Epoch: 0, loss: 9.8585, acc: 0.6646 | test loss: 1.4391, test acc: 0.8496 | 6.78 sec.
Epoch: 1, loss: 3.4123, acc: 0.8379 | test loss: 1.3063, test acc: 0.8302 | 6.77 sec.
Epoch: 2, loss: 3.0658, acc: 0.8384 | test loss: 1.1772, test acc: 0.8426 | 6.79 sec.
Epoch: 3, loss: 2.9296, acc: 0.8421 | test loss: 1.1244, test acc: 0.8475 | 7.44 sec.
Epoch: 4, loss: 2.8286, acc: 0.8460 | test loss: 1.0720, test acc: 0.8539 | 7.27 sec.
Epoch: 5, loss: 2.7241, acc: 0.8513 | test loss: 1.0846, test acc: 0.8515 | 7.14 sec.
Epoch: 6, loss: 2.6863, acc: 0.8533 | test loss: 1.0957, test acc: 0.8505 | 6.84 sec.
Epoch: 7, loss: 2.6019, acc: 0.8586 | test loss: 0.9401, test acc: 0.8725 | 7.10 sec.
Epoch: 8, loss: 2.5275, acc: 0.8634 | test loss: 1.0785, test acc: 0.8543 | 6.59 sec.
Epoch: 9, loss: 2.4814, acc: 0.8674 | test loss: 0.9372, test acc: 0.8754 | 8.01 sec.
Epoch: 10, loss: 2.4097, acc: 0.8747 | test loss: 0.9400, test acc: 0.8800 | 7.24 sec.
Epoch: 11, loss: 2.3191, acc: 0.8845 | test loss: 0.8

In [10]:
def encoder(phrase):
    return ''.join([caesor(k, CAESAR_OFFSET) for k in phrase])

def evaluate(phrase, model):
    print('Original: ', phrase)

    phrase_idx = torch.zeros(len(phrase), dtype=int)
    for i in range(len(phrase)):
        phrase_idx[i] = CHAR_TO_INDEX.get(phrase[i], CHAR_TO_INDEX[' '])
    enc_phrase = encoder(phrase)
    print('Encrypted: ', enc_phrase)

    enc_phrase_idx = [CHAR_TO_INDEX[k] for k in enc_phrase]
    model.eval()
    result = model.forward(torch.tensor([enc_phrase_idx])).argmax(dim=2)
    encoded_result = "".join([INDEX_TO_CHAR[item.item()] for i, item in enumerate(result[0])])
    print('Deencrypted: ', encoded_result)

    val_acc = (result == torch.tensor(phrase_idx)).flatten()
    val_acc = (val_acc.sum() / val_acc.shape[0]).item()
    print('Accuracy={:.2f}'.format(val_acc))

phrase = 'in a recurrent neural network we store the output activations from one or more of the layers of the network'
evaluate(phrase, model)


Original:  in a recurrent neural network we store the output activations from one or more of the layers of the network
Encrypted:  kpbcbtgewttgpvbpgwtcnbpgvyqtmbygbuvqtgbvjgbqwvrwvbcevkxcvkqpubhtqobqpgbqtboqtgbqhbvjgbnc gtubqhbvjgbpgvyqtm
Deencrypted:  in a recurrent neural networs we store the output activations from one or more of the layers of the networs
Accuracy=0.98


  val_acc = (result == torch.tensor(phrase_idx)).flatten()
