### Adversarial Neural Crypto

#### Project Description
Adversarial neural network model described in *Learning to Protect Communications with Adversarial Neural Cryptography (Martín Abadi & David G. Andersen, 2016)* as implemented by Liam Schoneveld (https://nlml.github.io/neural-networks/adversarial-neural-cryptography/). While I re-adapted the code to generate Alice's output (ciphertext) using a fixed key and fixed input message generator, my major contribution is to develop a truth table for Boolean functions of the form: L(x) = a0 + a1x1 + a2x2 + ... + a8x8 and use them to compute nonlinearity.     

In [1]:
import theano
import theano.tensor as T
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import itertools
from lasagne.updates import adam
np.set_printoptions(threshold=np.nan)
        
def get_activation(inp, act_fn, name):
    if act_fn == 'tanh':
        return T.tanh(inp)
    elif act_fn == 'relu':
        return T.nnet.relu(inp)
    elif act_fn == 'sigmoid':
        return T.nnet.sigmoid(inp)
    else:
        print ('Note: no valid activation specified for ' + name)
        return inp

# Function used to get the theano tensor from the class if class was passed to
# layer instead of raw tensor
def get_source(source):
    if 'Layer' in source.__class__.__name__:
        return source.output
    return source
    
# Function to get Glorot-initialised W shared matrix
def get_weights(in_dim, out_dim, name):
    W_val = np.asarray(\
        np.random.uniform(low=-np.sqrt(6. / (in_dim + out_dim)), 
                          high=np.sqrt(6. / (in_dim + out_dim)),
                          size=(in_dim, out_dim)), dtype=theano.config.floatX)
    return theano.shared(value=W_val, name=name, borrow=True)

# Function to get bias shared variable
def get_bias(d, name):
    b_values = np.zeros((d,), dtype=theano.config.floatX)
    b = theano.shared(value=b_values, name=name, borrow=True)
    return b

# Function to extract all the params from a list of layers
def get_all_params(layers):
    out = []
    for l in layers:
        for p in l.params:
            out.append(p)
    return out

class ConvLayer(object):

    def __init__(self, source, filter_shape, image_shape, stride,
                 act_fn, border_mode='full', name='conv'):

        assert image_shape[1] == filter_shape[1]
                             
        self.image_shape = image_shape
        self.filter_shape = filter_shape
        self.stride = stride
        self.border_mode = border_mode
        self.name = name
        self.act_fn = act_fn
        
        self.parent = source
        self.source = get_source(source)

        # there are "num input feature maps * filter height * filter width"
        # inputs to each hidden unit
        fan_in = np.prod(filter_shape[1:])
        # each unit in the lower layer receives a gradient from:
        # "num output feature maps * filter height * filter width"
        fan_out = (filter_shape[0] * np.prod(filter_shape[2:]))
        # initialize weights with random weights
        W_bound = np.sqrt(6. / (fan_in + fan_out))
        self.W = theano.shared(
            np.asarray(
                np.random.uniform(low=-W_bound, high=W_bound, size=filter_shape),
                dtype=theano.config.floatX
            ),
            borrow=True,
            name=name + '_W'
        )

        # the bias is a 1D tensor -- one bias per output feature map
        b_values = np.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True, name=name + '_b')
        # convolve input feature maps with filters
        conv_out = T.nnet.conv2d(
            input=self.source,
            filters=self.W,
            filter_shape=self.filter_shape,
            input_shape=self.image_shape,
            border_mode=self.border_mode,
            subsample=self.stride
        )
        
        # Calc output
        self.output_pre_activ = conv_out + self.b.dimshuffle('x', 0, 'x', 'x')
        # Activate it
        self.output = get_activation(self.output_pre_activ,
                                     act_fn=self.act_fn,
                                     name=self.name)

        self.params = [self.W, self.b]

class HiddenLayer():
    def __init__(self, source, input_size, hidden_size, name, act_fn):
        self.parent = source
        self.source = get_source(source)
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.name = name
        self.act_fn = act_fn
        # Get weights and bias
        self.W = get_weights(self.input_size, self.hidden_size, 'W_' + name)
        self.b = get_bias(self.hidden_size, 'b_' + name)
        # Calc output
        self.output_pre_activ = T.dot(self.source, self.W) + \
                                self.b.dimshuffle('x', 0)
        # Activate it
        self.output = get_activation(self.output_pre_activ,
                                     act_fn=self.act_fn,
                                     name=self.name)
        self.params = [self.W, self.b]
        
       
class StandardConvSetup():
    def __init__(self, reshaped_input, name='unnamed'):
        
        self.name = name
        self.conv_layer1 = ConvLayer(reshaped_input,
                                     filter_shape=(2, 1, 4, 1), #num outs, num ins, size
                                     image_shape=(None, 1, None, 1),
                                     stride=(1,1),
                                     name=self.name + '_conv1',
                                     border_mode=(2,0),
                                     act_fn='relu')
        
        self.conv_layer2 = ConvLayer(self.conv_layer1, 
                                     filter_shape=(4, 2, 2, 1),
                                     image_shape=(None, 2, None, 1),
                                     stride=(2,1),
                                     name=self.name + '_conv2',
                                     border_mode=(0,0),
                                     act_fn='relu')
        
        self.conv_layer3 = ConvLayer(self.conv_layer2, 
                                     filter_shape=(4, 4, 1, 1),
                                     image_shape=(None, 4, None, 1),
                                     stride=(1,1),
                                     name=self.name + '_conv3',
                                     border_mode=(0,0),
                                     act_fn='relu')
        
        self.conv_layer4 = ConvLayer(self.conv_layer3, 
                                     filter_shape=(1, 4, 1, 1),
                                     image_shape=(None, 4, None, 1),
                                     stride=(1,1),
                                     name=self.name + '_conv4',
                                     border_mode=(0,0),
                                     act_fn='tanh')
        
        self.output = self.conv_layer4.output
        self.layers = [self.conv_layer1, self.conv_layer2, 
                       self.conv_layer3, self.conv_layer4]
        self.params = []
        for l in self.layers:
            self.params += l.params


In [None]:
# Parameters definition
batch_size = 256
msg_len = 8
key_len = 8
comm_len = 8

# fixed key
fkey = np.array([1,0,1,0,0,0,0,1])


# Function to generate input messages with a fixed key 
def gen_msgfk():
    return (np.array(list(itertools.product([0,1], repeat=8)))*2-1).astype(theano.config.floatX),\
            (np.tile(fkey,(256,1))*2-1).astype(theano.config.floatX)
        

# Tensor variables for the message and key
msg_in = T.matrix('msg_in')
key = T.matrix('key')

# Alice's input is the concatenation of the message and the key
alice_in = T.concatenate([msg_in, key], axis=1)

# Alice's hidden layer
alice_hid = HiddenLayer(alice_in,
                        input_size=msg_len + key_len,
                        hidden_size=msg_len + key_len,
                        name='alice_to_hid',
                        act_fn='relu')

# Reshape the output of Alice's hidden layer for convolution
alice_conv_in = alice_hid.output.reshape((batch_size, 1, msg_len + key_len, 1))
# Alice's convolutional layers
alice_conv = StandardConvSetup(alice_conv_in, 'alice')
# Get the output communication
alice_comm = alice_conv.output.reshape((batch_size, msg_len))

# Bob's input is the concatenation of Alice's communication and the key
bob_in = T.concatenate([alice_comm, key], axis=1)
# He decrypts using a hidden layer and a conv net as per Alice
bob_hid = HiddenLayer(bob_in, 
                      input_size=comm_len + key_len,
                      hidden_size=comm_len + key_len,
                      name='bob_to_hid',
                      act_fn='relu')
bob_conv_in = bob_hid.output.reshape((batch_size, 1, comm_len + key_len, 1))
bob_conv = StandardConvSetup(bob_conv_in, 'bob')
bob_msg = bob_conv.output.reshape((batch_size, msg_len))


# Eve see's Alice's communication to Bob, but not the key
# She gets an extra hidden layer to try and learn to decrypt the message
eve_hid1 = HiddenLayer(alice_comm, 
                       input_size=comm_len,
                       hidden_size=comm_len + key_len,
                       name='eve_to_hid1',
                       act_fn='relu')
                          
eve_hid2 = HiddenLayer(eve_hid1, 
                       input_size=comm_len + key_len,
                       hidden_size=comm_len + key_len,
                       name='eve_to_hid2',
                       act_fn='relu')

eve_conv_in = eve_hid2.output.reshape((batch_size, 1, comm_len + key_len, 1))
eve_conv = StandardConvSetup(eve_conv_in, 'eve')
eve_msg = eve_conv.output.reshape((batch_size, msg_len))

# Loss Functions

# Eve's loss function is the L1 norm between true and recovered msg
decrypt_err_eve = T.mean(T.abs_(msg_in - eve_msg))

# Bob's loss function is the L1 norm between true and recovered
decrypt_err_bob = T.mean(T.abs_(msg_in - bob_msg))
# plus (N/2 - decrypt_err_eve) ** 2 / (N / 2) ** 2
# --> Bob wants Eve to do only as good as random guessing
loss_bob = decrypt_err_bob + (1. - decrypt_err_eve) ** 2.


# Training Functions

# Get all the parameters for Bob and Alice, make updates, train and pred funcs
params   = {'bob' : get_all_params([bob_conv, bob_hid, 
                                    alice_conv, alice_hid])}
updates  = {'bob' : adam(loss_bob, params['bob'])}
err_fn   = {'bob' : theano.function(inputs=[msg_in, key],
                                    outputs=decrypt_err_bob)}
train_fn = {'bob' : theano.function(inputs=[msg_in, key],
                                    outputs=loss_bob,
                                    updates=updates['bob'])}
pred_fn  = {'bob' : theano.function(inputs=[msg_in, key], outputs=bob_msg)}

# Get all the parameters for Eve, make updates, train and pred funcs
params['eve']   = get_all_params([eve_hid1, eve_hid2, eve_conv])
updates['eve']  = adam(decrypt_err_eve, params['eve'])
err_fn['eve']   = theano.function(inputs=[msg_in, key], 
                                  outputs=decrypt_err_eve)
train_fn['eve'] = theano.function(inputs=[msg_in, key], 
                                  outputs=decrypt_err_eve,
                                  updates=updates['eve'])
pred_fn['eve']  = theano.function(inputs=[msg_in, key], outputs=eve_msg)



# Training

# Function for training either Bob+Alice or Eve for some time
def train(bob_or_eve, results, max_iters, print_every, es=0., es_limit=100):
    count = 0
    for i in range(max_iters):
        # Generate some data
        msg_in_val, key_val = gen_msgfk()
        # Train on this batch and get loss
        loss = train_fn[bob_or_eve](msg_in_val, key_val)
        # Store absolute decryption error of the model on this batch
        results = np.hstack((results, 
                             err_fn[bob_or_eve](msg_in_val, key_val).sum()))
        # Print loss now and then
        if i % print_every == 0:
            print ('training loss:', loss)
        # Early stopping if we see a low-enough decryption error enough times
        if es and loss < es:
            count += 1
            if count > es_limit:
                break
    return np.hstack((results, np.repeat(results[-1], max_iters - i - 1)))

# Initialise some empty results arrays
results_bob, results_eve = [], []
adversarial_iterations = 60

# Perform adversarial training
for i in range(adversarial_iterations):
    n = 2000
    print_every = 100
    print ('training bob and alice, run:', i+1)
    results_bob = train('bob', results_bob, n, print_every, es=0.01)
    print ('training eve, run:', i+1)
    results_eve = train('eve', results_eve, n, print_every, es=0.01)


# Results

# Plot the results
plt.plot([np.min(results_bob[i:i+n]) for i in np.arange(0, 
          len(results_bob), n)])
plt.plot([np.min(results_eve[i:i+n]) for i in np.arange(0, 
          len(results_eve), n)])
plt.legend(['bob', 'eve'])
plt.xlabel('adversarial iteration')
plt.ylabel('lowest decryption error achieved')
plt.show()


In [None]:
# Extract Alice output
pred_fn  = theano.function(inputs=[msg_in, key], outputs=alice_comm)
msg_in_val, key_val = gen_msgfk() 
ciphertext = pred_fn(msg_in_val, key_val)

ciphertext = np.asarray(ciphertext, dtype=np.float)
ciphertext

In [None]:
#Set conditions to convert any value less than zero to 0 and any value greater than zero to 1 
condlist = [ciphertext<0, ciphertext>=0]
choicelist = [0, 1]
cipher = (np.select(condlist, choicelist)).astype(int)
cipher

In [None]:
# Function to generate input message only
def gen_msg():
    return (np.array(list(itertools.product([0,1], repeat=8))))

import pandas as pd
# Function to extract each bit from array, stack them up in a list and convert them to string
def convert_array_bits_to_string(arr):
    return [''.join(map(str, h)) for h in arr.tolist()] 

# Write message to a table (dataframe)
message = gen_msg()

df1 = pd.DataFrame({'message (m)': convert_array_bits_to_string(message)})
df1.index = df1.index + 1

# Write ciphertext to the table (dataframe)
df1['ciphertext (c)'] = convert_array_bits_to_string(cipher)
df1.head(n=20)

In [None]:
# Generate truth table for Boolean functions of the form: L(x) = a0 + a1x1 + a2x2 + ... + a8x8 

import numpy as np
import itertools
np.set_printoptions(threshold=np.nan)


# generate all possible values of a and store it in an array a
a = (np.array(list(itertools.product([0, 1], repeat=9)))).astype(int)

# generate all possible values of x and store it in an array x
x = (np.array(list(itertools.product([0, 1], repeat=8)))).astype(int)


d = [] # create list d
my = [] # create list my

# iterate through each array of a and x
for k in a:
    j = 0
    for m in x:
        res = k[1:9] * m
        res = np.append(k[0], res)

        # XOR logic
        ren = res[0]
        for i in range(len(res) - 1):
            ren = ren ^ res[i + 1]
        my.append(ren)  # append xor results to list my.

    j += 1
    s = np.array(my)
    d.append(s)
    my.clear()

tt = np.array(d)
tt

In [None]:
# Get the shape of the resulting array tt
tt.shape

In [None]:
# Convert tt array bits into string
tt = convert_array_bits_to_string(tt)
tt

In [None]:
# Transpose the ciphertext array
cipher = cipher.T
cipher.shape

In [None]:
# Convert ciphertext array bits into string
cipher = convert_array_bits_to_string(cipher)
cipher

In [None]:
# Compute Hamming Distances

def xor(x, y):
    return '{1:0{0}b}'.format(len(x), int(x, 2) ^ int(y, 2))


one_count_arr = []
one_count = []

for m in cipher:
    v = 0
    for f in tt:
        s = xor(m, f)  # xor logic

        # count ones in xor result
        count = 0
        for i in s:
            if i == '1':
                count += 1
        one_count.append(count)
    v += 1
    z = np.array(one_count)
    one_count_arr.append(z)
    one_count.clear()

one_count_arr

In [None]:
#Confirm the length of list one_count_arr
len(one_count_arr)

In [None]:
# Compute nonlinearity as minimum count of 1 in each array making up the list one_count_arr.
NL = []

for i in one_count_arr:
    g = min(i)
    NL.append(g)

NL