# Decoding an Autokey Cipher with Recurrent Neural Networks

A project by Sam Greydanus. November 2016

In [1]:
import time
import copy

import numpy as np
import tensorflow as tf

from dataloader import Dataloader

In [3]:
A = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
tsteps = 7
max_key_len = 5

dataloader = Dataloader(A, tsteps=tsteps, max_key_len=max_key_len)
dataloader.next_batch(10, info=True)

NameError: global name 'max_key_len' is not defined

## Class for generating Autokey training data

In [2]:
class Autokey():
    def __init__(self, alphabet, tsteps, key_len):
        self.A = alphabet
        self.tsteps = tsteps
        self.key_len = key_len
    
    def encode(self, key, plain):
        cipher = '' ; key = copy.deepcopy(key)
        for i in range(len(plain)):
            plain_ix = self.A.find(plain[i])
            shift = self.A.find(key[i])
            assert plain_ix >=0, 'plaintext characters must exist in alphabet'
            assert shift >=0, 'key characters must exist in alphabet'
            cipher_ix = (plain_ix + shift) % len(self.A)
            cipher += self.A[cipher_ix]
            key += self.A[plain_ix]
        return cipher
            
    def decode(self, key, cipher):
        plain = '' ; key = copy.deepcopy(key)
        for i in range(len(cipher)):
            cipher_ix = self.A.find(cipher[i])
            shift = self.A.find(key[i])
            assert cipher_ix >=0, 'ciphertext characters must exist in alphabet'
            assert shift >=0, 'key characters must exist in alphabet'
            plain_ix = (cipher_ix - shift) % len(self.A)
            plain += self.A[plain_ix]
            key += self.A[plain_ix]
        return plain
    
    def rands(self, size):
        ix = np.random.randint(len(self.A),size=size)
        return ''.join([self.A[i] for i in ix])
    
    def one_hot(self, s):
        ix = [self.A.find(l) for l in s]
        z = np.zeros((len(s),len(self.A)))
        z[range(len(s)),ix] = 1
        return z
    
    def next_batch(self, batch_size, verbose=False):
        batch_X = [] ; batch_y = [] ; batch_Xs = [] ; batch_ks = [] ; batch_ys = []
        for _ in range(batch_size):
            ys = self.rands(self.tsteps)
            ks = self.rands(self.key_len)
            Xs = self.encode(ks, ys)
            if verbose: print Xs, ks, ys
            X = self.one_hot(ks + Xs)
            y = self.one_hot(ks + ys)
            batch_X.append(X)
            batch_y.append(y)
            batch_Xs.append(Xs) ; batch_ks.append(ks) ; batch_ys.append(ys)
        
        if not verbose:
            return (batch_X,batch_y)
        else:
            return (batch_X, batch_y, batch_Xs, batch_ks, batch_ys)

## Stateful modular implementation of a recurrent neural network

In [3]:
class SequentialModel():
    def __init__(self, batch_size, tsteps, xlen, ylen, lr):
        self.sess = tf.InteractiveSession()
        self.batch_size = batch_size
        self.tsteps = tsteps
        self.xlen = xlen
        self.ylen = ylen
        self.lr = lr
        self.x = x = tf.placeholder(tf.float32, shape=[None, None, xlen], name="x")
        self.y = y = tf.placeholder(tf.float32, shape=[None, None, ylen], name="y")

        self.params = params = {}
        print 'hidden layer dim: ', xlen
        self.fc1_size = fc1_size = xlen
        self.rnn_size = rnn_size = 200
        with tf.variable_scope('model',reuse=False):
            xavier_l1 = tf.truncated_normal_initializer(mean=0, stddev=1./np.sqrt(ylen), dtype=tf.float32)
            params['W1'] = tf.get_variable("W1", [xlen, fc1_size], initializer=xavier_l1)

            rnn_init = tf.truncated_normal_initializer(stddev=0.075, dtype=tf.float32)
            params['rnn'] = tf.nn.rnn_cell.LSTMCell(rnn_size, state_is_tuple=True, initializer=rnn_init)

            params['istate_batch'] = params['rnn'].zero_state(batch_size=batch_size, dtype=tf.float32)
            params['istate'] = params['rnn'].zero_state(batch_size=1, dtype=tf.float32)

            xavier_l3 = tf.truncated_normal_initializer(stddev=1./np.sqrt(rnn_size), dtype=tf.float32)
            params['W3'] = tf.get_variable("W3", [rnn_size, ylen], initializer=xavier_l3)
            
        y_hat_batch, params['fstate_batch'] = self.forward(params['istate_batch'], tsteps=tsteps, reuse=False)
        self.y_hat, params['fstate'] = self.forward(params['istate'], tsteps=1, reuse=True)
        
        self.loss = tf.nn.l2_loss(y - y_hat_batch) / batch_size
        self.optimizer = tf.train.AdamOptimizer(lr)
        self.grads = self.optimizer.compute_gradients(self.loss, var_list=tf.trainable_variables())
        self.train_op = self.optimizer.apply_gradients(self.grads)

        self.sess.run(tf.initialize_all_variables())
        
        self.c = self.params['istate'].c.eval()
        self.h = self.params['istate'].h.eval()
            
    def forward(self, state, tsteps, reuse=False):
        with tf.variable_scope('model', reuse=reuse):
            x = tf.reshape(self.x, [-1, self.xlen])
            h = tf.matmul(x, self.params['W1'])
            h = tf.nn.relu(h) # ReLU nonlinearity

            hs = [tf.squeeze(h_, [1]) for h_ in tf.split(1, tsteps, tf.reshape(h, [-1, tsteps, self.fc1_size]))]
            rnn_outs, state = tf.nn.seq2seq.rnn_decoder(hs, state, self.params['rnn'], scope='model')
            rnn_out = tf.reshape(tf.concat(1, rnn_outs), [-1, self.rnn_size])

            logps = tf.matmul(rnn_out, self.params['W3'])
            logps = tf.nn.softmax(logps)
            p = tf.reshape(logps, [-1, tsteps, self.ylen])
        return p, state
    
    def train_step(self, batch):
        feed = {self.x: batch[0], self.y: batch[1]}
        train_loss, _ = self.sess.run([self.loss, self.train_op], feed_dict=feed)
        return train_loss
    
    def step(self, x):
        feed = {self.x: x, self.params['istate'].c: self.c, self.params['istate'].h: self.h}
        fetch = [self.y_hat, self.params['fstate'].c, self.params['fstate'].h]
        [y_hat, self.c, self.h] = self.sess.run(fetch, feed)
        return y_hat
    
    def generate(self, steps_forward):
        prev_x = np.asarray([[[0,1]]]) #np.zeros((1,1,self.xlen))
        xs = np.zeros((1,0,self.ylen))

        for t in range(steps_forward):
            xs = np.concatenate((xs,prev_x), axis=1)
            prev_x = self.step(prev_x)
        return xs

## Initialize model and data loader

In [4]:
global_step = 0
save_path = 'models/model.ckpt'
A = 'abcdefghijklmnopqrstuvwxyz'
tsteps = 10
key_len = 5
autokey = Autokey(A, tsteps=tsteps, key_len=key_len)
model = SequentialModel(batch_size=32, tsteps=tsteps+key_len, xlen=len(A), ylen=len(A), lr=1e-3)

hidden layer dim:  26


### Overview of TensorFlow parameters

In [5]:
total_parameters = 0 ; print "Model overview:"
for variable in tf.trainable_variables():
    shape = variable.get_shape()
    variable_parameters = 1
    for dim in shape:
        variable_parameters *= dim.value
    print '\tvariable "{}" has {} parameters' \
        .format(variable.name, variable_parameters)
    total_parameters += variable_parameters
print "Total of {} parameters".format(total_parameters)

Model overview:
	variable "model/W1:0" has 676 parameters
	variable "model/W3:0" has 5200 parameters
	variable "model/model/LSTMCell/W_0:0" has 180800 parameters
	variable "model/model/LSTMCell/B:0" has 800 parameters
Total of 187476 parameters


## Train loop

In [9]:
begin = time.time()
for i in range(global_step, 40000):
    start = time.time()
    
    batch = autokey.next_batch(model.batch_size)
    train_loss = model.train_step(batch)
    
    if global_step%100 == 0:
        print "\tstep {}, loss {:3f}, batch time {:3f}".format(i, train_loss, time.time()-start)
    global_step += 1
        
print "\ntotal runtime: {:3f} seconds".format(time.time()-begin)

	step 30000, loss 1.261393, batch time 0.044088
	step 30100, loss 1.373854, batch time 0.035164
	step 30200, loss 1.339942, batch time 0.036970
	step 30300, loss 1.187450, batch time 0.035823
	step 30400, loss 1.308978, batch time 0.038308
	step 30500, loss 1.119762, batch time 0.045648
	step 30600, loss 1.254778, batch time 0.050540
	step 30700, loss 1.128731, batch time 0.039066
	step 30800, loss 1.220700, batch time 0.038307
	step 30900, loss 1.227064, batch time 0.039420
	step 31000, loss 1.116932, batch time 0.034871
	step 31100, loss 1.065990, batch time 0.034955
	step 31200, loss 1.067873, batch time 0.033892
	step 31300, loss 0.968747, batch time 0.042924
	step 31400, loss 0.801423, batch time 0.037802
	step 31500, loss 1.065521, batch time 0.039608
	step 31600, loss 0.923276, batch time 0.037412
	step 31700, loss 0.886054, batch time 0.043036
	step 31800, loss 1.013661, batch time 0.035664
	step 31900, loss 0.941201, batch time 0.039171
	step 32000, loss 1.007557, batch time 0

## Estimate accuracy

In [10]:
batch = autokey.next_batch(1, verbose=True)

model.c = model.params['istate'].c.eval()
model.h = model.params['istate'].h.eval()

cipher = ''
for i in range(tsteps):
    p = np.tile(batch[0][0][i,:],[1,1,1])
    c = model.step(p)
#     print y_hat[0,0,:]
    ix = np.where(c[0,0,:] == np.amax(c[0,0,:]))
    cipher += autokey.A[ix[0]]
cipher = cipher[autokey.key_len:]
print cipher, '\n', batch[4][0]
t = np.asarray([c==batch[4][0][i] for (i,c) in enumerate(cipher)])
print t + 0
print "{} % correct".format(float(np.sum(t))/np.shape(t)[0]*100)

kuzzufjzle dpevk hfvekyeehu
hfvek 
hfvekyeehu
[1 1 1 1 1]
100.0 % correct




## Decode some text!

In [15]:
plaintext = 'youknownothingjonsnow'
key = 'bozos'
ciphertext = key + autokey.encode(key, plaintext)
print ciphertext

model.c = model.params['istate'].c.eval()
model.h = model.params['istate'].h.eval()

decoded = ''
for i in range(len(ciphertext)):
    p = np.tile(autokey.one_hot(ciphertext[i]),[1,1,1])
    c = model.step(p)
    ix = np.where(c[0,0,:] == np.amax(c[0,0,:]))
    decoded += autokey.A[ix[0]]
decoded = decoded[autokey.key_len:]
print "decoded sequence is: '{}'".format(decoded)
print "NOTE: errors appear towards end of sequence where it exceeds # time steps used for training"

bozoszctyfmkhygveaucvvftxk
decoded sequence is: 'youknownothinggonfnrw'
NOTE: errors appear towards end of sequence where it exceeds # time steps used for training


