In [0]:
#########################################################
# Decryption of Enigma coded strings
#########################################################

# -----------------------------------------------------------
# Part of coding challenge for Scale AI ML Role
# Github link: https://github.com/hkhoont/scale_ai_engima_machine
# email: hsk2147@columbia.edu
# -----------------------------------------------------------

In [0]:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)
link="https://drive.google.com/open?id=1ApOOStHi7clwdZesQ_hNVWqjtEWa6-lp"
_,id=link.split("=")
downloaded = drive.CreateFile({'id':id})
downloaded.GetContentFile('layers2_hiddensize64_batchsize32_saved_model.pth')

## A - Installing  and importing libraries

In [3]:
!pip install py-enigma
!pip install faker
!pip install tensorboardX==1.4
!pip install tqdm
!pip install hide_code



In [0]:
import warnings
warnings.filterwarnings("ignore")

from typing import List, Tuple
from enigma.machine import EnigmaMachine
from faker import Faker
import re
import numpy as np
import string
import random
import time
import math
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch.nn import functional as F
import collections
from tqdm import tqdm
from tqdm import trange, tqdm
from time import sleep

## B - Helper functions already provided

In [0]:
class ConfiguredMachine:
    def __init__(self):
        self.machine = EnigmaMachine.from_key_sheet(
            rotors='II IV V',
            reflector='B',
            ring_settings=[1, 20, 11],
            plugboard_settings='AV BS CG DL FU HZ IN KM OW RX')

    def reset(self):
        self.machine.set_display('WXC')

    def encode(self, plain_str: str) -> str:
        self.reset()
        return self.machine.process_text(plain_str)

    def batch_encode(self, plain_list: List[str]) -> List[str]:
        encoded = list()
        for s in plain_list:
            encoded.append(self.encode(s))
        return encoded

In [0]:
def pre_process(input_str):
    return re.sub('[^a-zA-Z]', '', input_str).upper()

def generate_data(batch_size: int, seq_len: int = 42) -> Tuple[List[str], List[str]]:
    fake = Faker()
    machine = ConfiguredMachine()

    plain_list = fake.texts(nb_texts=batch_size, max_nb_chars=seq_len)
    plain_list = [pre_process(p) for p in plain_list]
    cipher_list = machine.batch_encode(plain_list)
    return plain_list, cipher_list

def str_score(str_a: str, str_b: str) -> float:
    if len(str_a) != len(str_b):
        return 0
    n_correct = 0
    for a, b in zip(str_a, str_b):
        n_correct += int(a == b)
    return n_correct / len(str_a)

def score(predicted_plain: List[str], correct_plain: List[str]) -> float:
    correct = 0
    for p, c in zip(predicted_plain, correct_plain):
        if str_score(p, c) > 0.8:
            correct += 1
    return correct / len(correct_plain)

In [0]:
all_characters = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ"
vocab = {}
for i,char in enumerate(all_characters):
    vocab[i]=char

n_characters = len(all_characters)

In [0]:
def predict(cipher_list: List[str]) -> List[str]:
    """
    Solution
    Input: Plain string with length <42
    Output: Decryption of the string according to Enigma machine(as configured above). Size of decryption should be same as input string
    """
    model = torch.load('/content/layers2_hiddensize64_batchsize32_saved_model.pth')
    x_length = []
    for i in plain:
        x_length.append(len(i))
    x_length = torch.FloatTensor(x_length)

    x = torch.LongTensor(len(cipher_list), 42)
    for i in range(len(cipher_list)):
        x[i] = string_to_tensor(cipher_list[i])
    x = Variable(x)

    y_pred=model(x.long(),x_length)
    n_y_pred,_ = numerical_string(len(cipher_list),y_pred,cipher_list,x_length)
    # for i in range(len(cipher_list)):
    #     print(n_y_pred[i],cipher_list[i])
    return n_y_pred

In [0]:
def numerical_string(batch_size,y_pred,y_batch_string,x_length):
    """
    Converts tensor output to String form
    """
    x_length_array = x_length.detach().numpy().astype('int') 
    m = max(x_length_array)
    y_pred = y_pred.view(batch_size,m,27)
    n_y_pred = []

    for k in range(batch_size):
        string_pred = "" 
        for i in range(x_length_array[k]):
            string_pred = string_pred+vocab[np.argmax(y_pred[k][i].detach().numpy())]

        n_y_pred.append(string_pred)

    return n_y_pred,y_batch_string

In [0]:
# Turning a string into a tensor
def string_to_tensor(string):
    """
    Convert string into numerical torch tensor filled with index according to above mentioned 'all_characters' variable
    Input: String of capital letters to be dencrypted 
    Output: Torch tensor(torch.size(42)) 
    """
    tensor = torch.zeros(42).long()
    for c in range(len(string)):
        try:
            tensor[c] = all_characters.index(string[c])
        except:
            continue
    return tensor

In [0]:
class CharLSTM(nn.Module):
    def __init__(self, input_size, hidden_size,output_size, n_layers):
        super(CharLSTM, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers

        padding_idx = 0

        # Encoder embedding layer is used to convert batch text in one-hot encode form
        # Since its one-hot encoded (vocal size = 27), there no need to learn the embeddings
        self.encoder = nn.Embedding(num_embeddings=self.input_size,
                                    embedding_dim=self.input_size,
                                    padding_idx=padding_idx,)
        self.encoder.weight.data = torch.eye(self.input_size)
        self.encoder.weight.requires_grad = False

        # LSTM Unit
        self.lstm = nn.LSTM(self.input_size, 
                           hidden_size, 
                           n_layers, 
                           batch_first=True)
        
        #Decoder
        self.decoder = nn.Linear(hidden_size, 
                                 output_size)
        
    def forward(self, x, x_length):

        # print("Max in x_length",max(x_length))
        batch_size_input = x_length.shape[0]
        self.hidden = self.init_hidden(batch_size_input)                        # Initialize the weights
        x = self.encoder(x)                                                     # Encoder

        # print("/CharLSTM/forward/ Size: x after encoder - ",x.shape)

        #Pack Padding will ensure that forward pass is carried for only units equal to length of string 
        x = torch.nn.utils.rnn.pack_padded_sequence(x, 
                                                    x_length, 
                                                    batch_first=True,
                                                    enforce_sorted=False)
        
        x, self.hidden = self.lstm(x, self.hidden)                              # Running the LSTM Unit
        x, _ = torch.nn.utils.rnn.pad_packed_sequence(x, batch_first=True)

        # print("/CharLSTM/forward/ Size: x after pad - ",x.shape)

        x = x.contiguous()
        x = x.view(-1, x.shape[2])
        # print("/CharLSTM/forward/ Size: x after view - ",x.shape)
        x = self.decoder(x)                                                     # Decoder
        # print("/CharLSTM/forward/ Size: x after decoder - ",x.shape)
        x = F.log_softmax(x, dim=1)
        # x = F.softmax(x, dim=1)

        Y_hat = x
        return Y_hat


    def init_hidden(self,batch_size_input):
        """
        Initial the weights
        Method: Random
        """
        hidden_a = torch.zeros(self.n_layers,batch_size_input, self.hidden_size)
        hidden_b = torch.zeros(self.n_layers,batch_size_input, self.hidden_size)

        hidden_a = Variable(hidden_a)
        hidden_b = Variable(hidden_b)

        return (hidden_a, hidden_b)

In [17]:
plain,cipher = generate_data(16384)
score(predict(cipher), plain)

1.0