### Libraries 

In [20]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

import numpy as np
import pandas as pd
import time

import random
import string

### Ceasar Cipher Implementation

In [21]:
sample = 'Jim quickly realized that the beautiful gowns - are expensive'
shift = 17
alphabet = 'ABCDEFGHIJKLMNOPQRSTVUWXYZabcdefghijklmnopqrstvuwxyz 0123456789'

In [22]:
#   simple Ceasar Cipher
def ceasar_cipher(text, shift, alphabet, decryption=False):
    output = ''
    if decryption == True:
        shift = -shift
    for x in range(len(text)):
        if text[x] in alphabet:
            output += alphabet[(alphabet.index(text[x]) + shift) % len(alphabet)]
        else:
            output += text[x] 
    return output

print('Original text:\n', sample)
enc_data = ceasar_cipher(sample, shift, alphabet)
print('Encrypted text:\n', enc_data)
dec_data = ceasar_cipher(enc_data, shift, alphabet, decryption=True)
print('Decrypted text:\n', dec_data)

Original text:
 Jim quickly realized that the beautiful gowns - are expensive
Encrypted text:
 az2G6Bzt01EG7ur1zFuvG9yr9G9yuGsurB9zwB1Gx4C38G-Gr7uGuD5u38zAu
Decrypted text:
 Jim quickly realized that the beautiful gowns - are expensive


In [23]:
#   function for generate train sequences
def generate_random_string(length_sen, num):
    np_alphabet = np.array(list(alphabet))
    np_codes = np.random.choice(np_alphabet, [num, length_sen])
    return [''.join(np_codes[i]) for i in range(len(np_codes))]

In [24]:
#    creating dataframe for visibility
df = pd.DataFrame(generate_random_string(50, 100))
df.columns = ['original_sentence']
df.head()

Unnamed: 0,original_sentence
0,RkWzKCnIiz9bjUxqNDaFTzSw1dE23QbsBj4M8QF7FfQCTU...
1,r4HoBfbndlwk2TsmINNQplR2d6ZkKkrnljfHiabhgwu89j...
2,lzG31mDXFvth8oJYCgdcIW2ecr q1noWDb7O kbTtO8Qt9...
3,LJz7WYv2Tj1Nuz3u5jr33RGcp3lUc7swqbOI3yVlehEkaO...
4,MrbMYpJeDpllKgDBGJE0D9juMnfU0cpMUzmyFlWEPrSumf...


In [25]:
#    adding encrypted sentences to dataframe
df['encrypted_sentence'] = df['original_sentence'].apply(lambda x: ceasar_cipher(x, shift, alphabet))
df.head()

Unnamed: 0,original_sentence,encrypted_sentence
0,RkWzKCnIiz9bjUxqNDaFTzSw1dE23QbsBj4M8QF7FfQCTU...,i0nFbT3ZzFQs mD6eVrWkFjCIvUJKhs8S LdPhWOWwhTkm...
1,r4HoBfbndlwk2TsmINNQplR2d6ZkKkrnljfHiabhgwu89j...,7LY4Sws3v1C0Jk82Zeeh51iJvNq0b0731 wYzrsyxCBPQ ...
2,lzG31mDXFvth8oJYCgdcIW2ecr q1noWDb7O kbTtO8Qt9...,1FXKI2VoWA9yP4apTxvtZnJut7G6I34nVsOfG0sk9fPh9Q...
3,LJz7WYv2Tj1Nuz3u5jr33RGcp3lUc7swqbOI3yVlehEkaO...,caFOnpAJk IeBFKBM 7KKiXt5K1mtO8C6sfZKEl1uyU0rf...
4,MrbMYpJeDpllKgDBGJE0D9juMnfU0cpMUzmyFlWEPrSumf...,d7sdp5auV511bxVSXaUHVQ Bd3wmHt5dmF2EW1nUg7jB2w...


### Preparation for training

In [26]:
#    creating dictionary from 'alphabet' and adding 'None'
dict_char = {char: i for i, char in enumerate(['None'] + [char for char in alphabet])}

In [27]:
#    creating listes of characters of original and encrypted sequences
sentence_corpus_orig = [[char for char in sentence] for sentence in df['original_sentence'].tolist()]
sentence_corpus_enc = [[char for char in sentence] for sentence in df['encrypted_sentence'].tolist()]

print(sentence_corpus_orig[0][:15])
print(sentence_corpus_enc[0][:15])

['R', 'k', 'W', 'z', 'K', 'C', 'n', 'I', 'i', 'z', '9', 'b', 'j', 'U', 'x']
['i', '0', 'n', 'F', 'b', 'T', '3', 'Z', 'z', 'F', 'Q', 's', ' ', 'm', 'D']


In [28]:
#    function for creating tensors with fixed length 
def to_torch(text, sen_len, dictionary):
    X = torch.zeros((len(text), sen_len), dtype=int)
    for i in range(len(text)):
        for j, k in enumerate(text[i]):
            if j >= sen_len:
                break
            #    if length of a sequence is less than the fixed length of a tensor, fill with 'None' (aka Padding)
            X[i,j] = dict_char.get(k, dict_char['None'])
    return X

In [31]:
#   making length of row in tensor
len_tensor = 60

In [34]:
#   encrypted sequences -> tensor
X_enc = to_torch(sentence_corpus_enc, len_tensor, dict_char)
print(X_enc.size())
print(X_enc[0])

torch.Size([100, 60])
tensor([35, 54, 40,  6, 28, 20, 57, 26, 52,  6, 17, 45, 53, 39,  4, 60, 31, 21,
        44, 23, 37,  6, 36,  3,  9, 47, 22, 10, 11, 34, 45, 62, 19, 53, 12, 30,
        16, 34, 23, 15, 23, 49, 34, 20, 37, 39, 15, 31, 38, 11,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0])


In [35]:
#   original sequences -> tensor
X_orig = to_torch(sentence_corpus_orig, len_tensor, dict_char)
print(X_orig.size())
print(X_orig[0])

torch.Size([100, 60])
tensor([18, 37, 23, 52, 11,  3, 40,  9, 35, 52, 63, 28, 36, 22, 50, 43, 14,  4,
        27,  6, 20, 52, 19, 49, 55, 30,  5, 56, 57, 17, 28, 45,  2, 36, 58, 13,
        62, 17,  6, 61,  6, 32, 17,  3, 20, 22, 61, 14, 21, 57,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0])


### RNN 

In [36]:
#    define simple RNN with torch
class RNN(torch.nn.Module):
    def __init__(self):
        super(RNN, self).__init__()
        self.embed = torch.nn.Embedding(len(dict_char), len(dict_char))
        self.rnn = torch.nn.RNN(len(dict_char), 256, batch_first=True)
        self.linear = torch.nn.Linear(256, len(dict_char))
        
    def forward(self, sentences, state=None):
        embed = self.embed(sentences)
        o, a = self.rnn(embed)
        out = self.linear(o)
        return out
    
model = RNN().to(device)

In [37]:
#    define criteretion (cross entropy) and optimizer (Adam)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.005)

In [38]:
#    training
for ep in range(10):
    start = time.time()
    train_loss = 0.
    train_passed = 0

    for i in range(int(len(X_enc))):
        X_orig = X_orig.flatten().to(device)

        optimizer.zero_grad()
        answers = model.forward(X_enc.to(device))
        answers = answers.view(-1, len(dict_char))
        loss = criterion(answers, X_orig)
        train_loss += loss.item()

        loss.backward()
        optimizer.step()
        train_passed += 1
   
    print("Epoch {}. Time: {:.3f}, Train loss: {:.6f}".format(ep+1, time.time() - start, train_loss / train_passed))

Epoch 1. Time: 3.400, Train loss: 0.218415
Epoch 2. Time: 3.291, Train loss: 0.000189
Epoch 3. Time: 3.186, Train loss: 0.000135
Epoch 4. Time: 3.185, Train loss: 0.000104
Epoch 5. Time: 3.180, Train loss: 0.000084
Epoch 6. Time: 3.184, Train loss: 0.000069
Epoch 7. Time: 3.184, Train loss: 0.000058
Epoch 8. Time: 3.225, Train loss: 0.000049
Epoch 9. Time: 3.183, Train loss: 0.000042
Epoch 10. Time: 3.187, Train loss: 0.000036


In [39]:
#   function for predict 
def prediction(test):
    test_enc = [ceasar_cipher(test, shift, alphabet)]
    test_tensor = to_torch(test_enc, 100, dict_char)
    predict = model(test_tensor.to(device))
    predict = predict.squeeze(0)
    predict_test = ''
    for i, j in enumerate(predict):
        if j.argmax() != 0:
            predict_test += list(dict_char.keys())[list(dict_char.values()).index(j.argmax())]
    return predict_test

In [41]:
prediction("Gaius Julius Caesar was a Roman general and statesman")

'Gaius Julius Caesar was a Roman general and statesman'

### How to measure the quality of the Decryptor?

The quality of a decoder is determined by its ability to decode all characters in the input sequence. The distortion of at least one character in the output is unacceptable. Therefore, we will measure the quality of the model as follows: 

- we feed the encrypted sentence of symbols to the model input
- run it through the model and compare it with the original
- if at least one symbol is decoded incorrectly, we consider that the whole sentence is erroneous

In [42]:
model.eval()
n_test = 100
k=0
start = time.time()
with torch.no_grad():
    for i in range(n_test):
        true_sentence = generate_random_string(50, 1)
        predict_sentence = prediction(true_sentence[0])
        if true_sentence[0] == predict_sentence:
            k+=1
        
print('Correct answers: {:.2f}%'.format(k/n_test * 100))
print('{:.2f} seconds'.format(time.time() - start))

Correct answers: 100.00%
16.21 seconds
