In [None]:
!pip install keras

In [None]:
import tensorflow as tf
from tensorflow import keras

# The Speck cipher and data generation algorithms #
Taken from https://github.com/agohr/deep_speck/blob/master/speck.py

In [None]:
import numpy as np
from os import urandom

def WORD_SIZE():
    return(16);

def ALPHA():
    return(7);

def BETA():
    return(2);

MASK_VAL = 2 ** WORD_SIZE() - 1;

def shuffle_together(l):
    state = np.random.get_state();
    for x in l:
        np.random.set_state(state);
        np.random.shuffle(x);

def rol(x,k):
    return(((x << k) & MASK_VAL) | (x >> (WORD_SIZE() - k)));

def ror(x,k):
    return((x >> k) | ((x << (WORD_SIZE() - k)) & MASK_VAL));

def enc_one_round(p, k):
    c0, c1 = p[0], p[1];
    c0 = ror(c0, ALPHA());
    c0 = (c0 + c1) & MASK_VAL;
    c0 = c0 ^ k;
    c1 = rol(c1, BETA());
    c1 = c1 ^ c0;
    return(c0,c1);

def dec_one_round(c,k):
    c0, c1 = c[0], c[1];
    c1 = c1 ^ c0;
    c1 = ror(c1, BETA());
    c0 = c0 ^ k;
    c0 = (c0 - c1) & MASK_VAL;
    c0 = rol(c0, ALPHA());
    return(c0, c1);

def expand_key(k, t):
    ks = [0 for i in range(t)];
    ks[0] = k[len(k)-1];
    l = list(reversed(k[:len(k)-1]));
    for i in range(t-1):
        l[i%3], ks[i+1] = enc_one_round((l[i%3], ks[i]), i);
    return(ks);

def encrypt(p, ks):
    x, y = p[0], p[1];
    for k in ks:
        x,y = enc_one_round((x,y), k);
    return(x, y);

def decrypt(c, ks):
    x, y = c[0], c[1];
    for k in reversed(ks):
        x, y = dec_one_round((x,y), k);
    return(x,y);

def check_testvector():
  key = (0x1918,0x1110,0x0908,0x0100)
  pt = (0x6574, 0x694c)
  ks = expand_key(key, 22)
  ct = encrypt(pt, ks)
  if (ct == (0xa868, 0x42f2)):
    print("Testvector verified.")
    return(True);
  else:
    print("Testvector not verified.")
    return(False);

#convert_to_binary takes as input an array of ciphertext pairs
#where the first row of the array contains the lefthand side of the ciphertexts,
#the second row contains the righthand side of the ciphertexts,
#the third row contains the lefthand side of the second ciphertexts,
#and so on
#it returns an array of bit vectors containing the same data
def convert_to_binary(arr):
  X = np.zeros((4 * WORD_SIZE(),len(arr[0])),dtype=np.uint8);
  for i in range(4 * WORD_SIZE()):
    index = i // WORD_SIZE();
    offset = WORD_SIZE() - (i % WORD_SIZE()) - 1;
    X[i] = (arr[index] >> offset) & 1;
  X = X.transpose();
  return(X);

#takes a text file that contains encrypted block0, block1, true diff prob, real or random
#data samples are line separated, the above items whitespace-separated
#returns train data, ground truth, optimal ddt prediction
def readcsv(datei):
    data = np.genfromtxt(datei, delimiter=' ', converters={x: lambda s: int(s,16) for x in range(2)});
    X0 = [data[i][0] for i in range(len(data))];
    X1 = [data[i][1] for i in range(len(data))];
    Y = [data[i][3] for i in range(len(data))];
    Z = [data[i][2] for i in range(len(data))];
    ct0a = [X0[i] >> 16 for i in range(len(data))];
    ct1a = [X0[i] & MASK_VAL for i in range(len(data))];
    ct0b = [X1[i] >> 16 for i in range(len(data))];
    ct1b = [X1[i] & MASK_VAL for i in range(len(data))];
    ct0a = np.array(ct0a, dtype=np.uint16); ct1a = np.array(ct1a,dtype=np.uint16);
    ct0b = np.array(ct0b, dtype=np.uint16); ct1b = np.array(ct1b, dtype=np.uint16);
    
    #X = [[X0[i] >> 16, X0[i] & 0xffff, X1[i] >> 16, X1[i] & 0xffff] for i in range(len(data))];
    X = convert_to_binary([ct0a, ct1a, ct0b, ct1b]); 
    Y = np.array(Y, dtype=np.uint8); Z = np.array(Z);
    return(X,Y,Z);

#baseline training data generator
def make_train_data(n, nr, diff=(0x0040,0)):
  Y = np.frombuffer(urandom(n), dtype=np.uint8); Y = Y & 1;
  keys = np.frombuffer(urandom(8*n),dtype=np.uint16).reshape(4,-1);
  plain0l = np.frombuffer(urandom(2*n),dtype=np.uint16);
  plain0r = np.frombuffer(urandom(2*n),dtype=np.uint16);
  plain1l = plain0l ^ diff[0]; plain1r = plain0r ^ diff[1];
  num_rand_samples = np.sum(Y==0);
  plain1l[Y==0] = np.frombuffer(urandom(2*num_rand_samples),dtype=np.uint16);
  plain1r[Y==0] = np.frombuffer(urandom(2*num_rand_samples),dtype=np.uint16);
  ks = expand_key(keys, nr);
  ctdata0l, ctdata0r = encrypt((plain0l, plain0r), ks);
  ctdata1l, ctdata1r = encrypt((plain1l, plain1r), ks);
  X = convert_to_binary([ctdata0l, ctdata0r, ctdata1l, ctdata1r]);
  return(X,Y);

#real differences data generator
def real_differences_data(n, nr, diff=(0x0040,0)):
  #generate labels
  Y = np.frombuffer(urandom(n), dtype=np.uint8); Y = Y & 1;
  #generate keys
  keys = np.frombuffer(urandom(8*n),dtype=np.uint16).reshape(4,-1);
  #generate plaintexts
  plain0l = np.frombuffer(urandom(2*n),dtype=np.uint16);
  plain0r = np.frombuffer(urandom(2*n),dtype=np.uint16);
  #apply input difference
  plain1l = plain0l ^ diff[0]; plain1r = plain0r ^ diff[1];
  num_rand_samples = np.sum(Y==0);
  #expand keys and encrypt
  ks = expand_key(keys, nr);
  ctdata0l, ctdata0r = encrypt((plain0l, plain0r), ks);
  ctdata1l, ctdata1r = encrypt((plain1l, plain1r), ks);
  #generate blinding values
  k0 = np.frombuffer(urandom(2*num_rand_samples),dtype=np.uint16);
  k1 = np.frombuffer(urandom(2*num_rand_samples),dtype=np.uint16);
  #apply blinding to the samples labelled as random
  ctdata0l[Y==0] = ctdata0l[Y==0] ^ k0; ctdata0r[Y==0] = ctdata0r[Y==0] ^ k1;
  ctdata1l[Y==0] = ctdata1l[Y==0] ^ k0; ctdata1r[Y==0] = ctdata1r[Y==0] ^ k1;
  #convert to input data for neural networks
  X = convert_to_binary([ctdata0l, ctdata0r, ctdata1l, ctdata1r]);
  return(X,Y);


# Evaluate the results #
Taken from https://github.com/agohr/deep_speck/blob/master/eval.py and slightly adapted.

In [None]:
def evaluate(net,X,Y):
    
    Z = net.predict(X,batch_size=10000).flatten();
    Zbin = (Z > 0.5);
    
    #Compute the acc, tpr, tnr
    n = len(Z); 
    n0 = np.sum(Y==0); 
    n1 = np.sum(Y==1);
    
    acc = np.sum(Zbin == Y) / n;
    tpr = np.sum(Zbin[Y==1]) / n1;
    tnr = np.sum(Zbin[Y==0] == 0) / n0;
    
    return(acc, tpr, tnr);

# Conduct multiple evaluations #

In [None]:
def multiple_evaluations(model, repetitions, num_rounds):
 
  #The accs, tprs, tnrs for all evaluation repetition
  accs = [];
  tprs = [];
  tnrs = [];
    
  #Evaluate multiple times and average the results 
  for i in range(0, repetitions):
    X_eval, Y_eval = make_train_data(10**6, num_rounds);

    (acc, tpr, tnr) = evaluate(model, X_eval, Y_eval);
    accs.append(acc);
    tprs.append(tpr);
    tnrs.append(tnr);

  print("Acc: " + str(np.mean(accs)) + str(" +- ") + str(np.std(accs)) + str("\t") + 
        "Tpr:" + str(np.mean(tprs)) + str(" +- ") + str(np.std(tprs)) + str("\t") +
        "Tnr:" + str(np.mean(tnrs)) + str(" +- ") + str(np.std(tnrs)) + str("\t"));
        
  return(np.mean(accs), np.mean(tprs), np.mean(tnrs));

# The reduced network #
Obtained from the depth-1 distinguisher implemented in https://github.com/agohr/deep_speck/blob/master/train_nets.py.

In [None]:
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.models import Model
from keras.optimizers import Adam
from keras.layers import Dense, Conv1D, Input, Reshape, Permute, Add, Flatten, BatchNormalization, Activation, MaxPooling1D, Concatenate,Dropout, AveragePooling1D, GlobalAveragePooling1D, GlobalMaxPooling1D
from keras.regularizers import l2, l1, l1_l2


def cyclic_lr(num_epochs, high_lr, low_lr):
  res = lambda i: low_lr + ((num_epochs-1) - i % num_epochs)/(num_epochs-1) * (high_lr - low_lr);
  return(res);

#Batch size
bs = 5000;

def build_reduced_network():

  #Input and preprocessing layers
  inp = Input(shape=(64,));
  x = Reshape((4, 16))(inp);
  x = Permute((2,1))(x);
  
  #Block 1: 7 filters were pruned 
  conv1 = Conv1D(filters=25, kernel_size=1, padding='same')(x); #Also refered to as C1
  conv1 = BatchNormalization()(conv1);
  conv1 = Activation('relu')(conv1);

  #Block 2-i where:
  #From its first convolutional layer: 21 filters were pruned
  conv2 = Conv1D(filters=11, kernel_size=3, padding='same')(conv1); #Also refered to as C2
  conv2 = BatchNormalization()(conv2);
  conv2 = Activation('relu')(conv2);

  #From its second convolutional layer: 25 filters were pruned
  conv3 = Conv1D(filters=7, kernel_size=3, padding='same')(conv2); #Also refered to as C3
  conv3 = BatchNormalization()(conv3);
  conv3 = Activation('relu')(conv3);

  #The residual connection was removed 
    
  flat = Flatten()(conv3);

  #Block 3 where:
  #From its first dense layer: 46 neurons were pruned   
  dense1 = Dense(18)(flat); #Also refered to as D1
  dense1 = BatchNormalization()(dense1);
  dense1 = Activation('relu')(dense1);
    
  #From its second dense layer: 36 neurons were pruned  
  dense2 = Dense(28)(dense1); #Also refered to as D2
  dense2 = BatchNormalization()(dense2);
  dense2 = Activation('relu')(dense2);

  out = Dense(1, activation='sigmoid')(dense2);
    
  model = Model(inputs=inp, outputs=out);

  return model;



def model_builder():

  model = build_reduced_network();

  model.compile(
          optimizer='adam',
          loss='binary_crossentropy',
          metrics=['acc']);
    
  return model;
  


def train_speck_distinguisher(model, num_epochs, num_rounds, X_train, Y_train, X_eval, Y_eval):
    
    stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, restore_best_weights= True);
    lr = LearningRateScheduler(cyclic_lr(10,0.002, 0.0001));
    
    model.fit(X_train, Y_train, batch_size= bs, epochs=num_epochs,validation_data=(X_eval, Y_eval), callbacks=[lr, stop_early])
    
    return model;


Let's look at a prediction:

In [None]:
#Select the number of rounds 
num_rounds=5;

#Generate training and test data - ciphertext pairs 
X_train, Y_train = make_train_data(10**7,num_rounds);
X_eval, Y_eval = make_train_data(10**6, num_rounds);
    
#Construct and train the reduced network
model = model_builder();
trained_model = train_speck_distinguisher(model, 30, num_rounds, X_train, Y_train, X_eval, Y_eval);

In [None]:
#Class 1 = fixed difference
#Class 0 = random difference

#If prediction > 0.5 -> assigned to class 1
#Else -> assigned to class 0 

X, Y = make_train_data(1, num_rounds)
Y_pred = trained_model.predict(X)

print("True class: "+str(Y[0]) +"\t\t"+ "Prediction: "+ str(Y_pred[0][0]))

# Repeat: Train the model and evaluate the  results #

In [None]:
def repeat_experiment(num_reps, num_rounds):

  #Store the accs, tprs, tnrs for all experimenent repetitons
  accs= [];
  tprs =[];
  tnrs=[];
    
  #Repeat the experiment multiple times
  for i in range(0, num_reps):
    
    
    X_train, Y_train = make_train_data(10**7,num_rounds);
    X_eval, Y_eval = make_train_data(10**6, num_rounds);
    
    model = model_builder();
    trained_model = train_speck_distinguisher(model, 30, num_rounds, X_train, Y_train, X_eval, Y_eval);
    
    (acc, tpr, tnr) = multiple_evaluations(trained_model, 5, num_rounds);
    accs.append(acc);
    tprs.append(tpr);
    tnrs.append(tnr);

  print("Round: "+str(num_rounds));
  print("Acc: " + str(np.mean(accs)) + str(" +- ") + str(np.std(accs)) + str("\t") + 
        "Tpr:" + str(np.mean(tprs)) + str(" +- ") + str(np.std(tprs)) + str("\t") +
        "Tnr:" + str(np.mean(tnrs)) + str(" +- ") + str(np.std(tnrs)) + str("\t"));
  
  return (accs, tprs, tnrs);



# Run the experiment #

In [None]:
#num_rounds = 5, 6, 7, 8;
#num_reps = 5;

repeat_experiment(num_reps= 5, num_rounds= 5);