In [1]:
import subprocess
from subprocess import PIPE, DEVNULL
from random import randrange
import numpy as np
import pandas as pd
from multiprocess import Pool
from math import prod as mul
from itertools import product


In [2]:
s_block = [8, 0, 12, 4, 9, 6, 7, 11, 2, 3, 1, 15, 5, 14, 10, 13]

s_block_rev = [1, 10, 8, 9, 3, 12, 5, 6, 0, 4, 14, 7, 2, 13, 15, 11]

list_size = 1 << 16


In [3]:
class heys_cipher:
	def substitution(self, number, need_cipher = True):
		new_number = 0

		operation = s_block if need_cipher == True else s_block_rev

		for i in range(4):
			new_number |= operation[(number >> i * 4) & 15] << i * 4

		return new_number
	
	def mix_words(self, number):
		new_number = 0

		for j in range(4):
			for i in range(4):
				new_number |= (number >> (4 * j + i) & 1) << (4 * i + j)

		return new_number

	def encrypt(self, number, key = 0):
		new_number = number ^ key
		new_number = self.substitution(new_number)
		new_number = self.mix_words(new_number)

		return new_number

	def decrypt(self, number, key = 0):
		new_number = self.mix_words(new_number)
		new_number = self.substitution(new_number, False)
		new_number = number ^ key

		return new_number


In [4]:
heys_instance = heys_cipher()

def encrypt(number, key):
	round_keys = []

	for i in range(7):
		round_keys.append((key >> 16 * i) & 65535)

	encrypted_number = number

	for i in range(6):
		encrypted_number = heys_instance.encrypt(encrypted_number, round_keys[i])

	encrypted_number ^= round_keys[6]

	return encrypted_number


In [5]:
diff_s_block_table = np.zeros((16, 16))

for alpha in range(16):
	for beta in range(16):
		for x in range(16):
			diff_s_block_table[alpha][beta] += heys_instance.substitution(x ^ alpha) == heys_instance.substitution(x) ^ beta

diff_s_block_table /= 16

pd.DataFrame(diff_s_block_table)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,0.125,0.0,0.0,0.0,0.0,0.0,0.125,0.25,0.0,0.0,0.125,0.125,0.0,0.125,0.125
2,0.0,0.0,0.0,0.25,0.25,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.125,0.125,0.125,0.125
3,0.0,0.125,0.25,0.0,0.125,0.0,0.0,0.0,0.125,0.0,0.0,0.0,0.25,0.125,0.0,0.0
4,0.0,0.125,0.125,0.0,0.0,0.0,0.125,0.125,0.0,0.0,0.0,0.25,0.0,0.125,0.0,0.125
5,0.0,0.0,0.0,0.125,0.0,0.125,0.125,0.125,0.0,0.125,0.0,0.0,0.25,0.0,0.125,0.0
6,0.0,0.125,0.125,0.0,0.125,0.125,0.0,0.0,0.125,0.0,0.0,0.125,0.0,0.0,0.125,0.125
7,0.0,0.0,0.0,0.125,0.0,0.0,0.0,0.125,0.0,0.125,0.25,0.0,0.0,0.125,0.0,0.25
8,0.0,0.0,0.0,0.125,0.0,0.0,0.125,0.0,0.125,0.0,0.125,0.125,0.125,0.25,0.0,0.0
9,0.0,0.125,0.125,0.25,0.0,0.125,0.0,0.125,0.0,0.0,0.125,0.125,0.0,0.0,0.0,0.0


In [7]:
def compute_temporary_list(alpha):
	alpha_array = [(alpha >> 4 * i) & 15 for i in range(4)]

	not_null_betas = [
		[
			beta for beta in range(16) if diff_s_block_table[alpha_array[i]][beta] != 0
		] for i in range(4)
	]

	lin_list = {}

	for (i, beta_array) in enumerate(product(*not_null_betas)):
		beta = sum([beta_array[i] << i * 4 for i in range(4)])

		lin_list[beta] = mul([diff_s_block_table[alpha_array[i]][beta_array[i]] for i in range(4)])

	return lin_list


In [8]:
def init_worker(shared_diff_table):
	global diff_table

	diff_table = shared_diff_table


In [12]:
p = 0.0008

def differential_search(alpha):
	global diff_table

	list_prev = {alpha: 1}

	for i in range (1, 6):
		list_curr = {}

		for beta in list_prev:
			p_i = list_prev[beta]

			if beta not in diff_table:
				diff_table[beta] = compute_temporary_list(beta)

			for gamma in diff_table[beta]:
				additional_probability = p_i * diff_table[beta][gamma]	

				list_curr[gamma] = additional_probability if gamma not in list_curr else list_curr[gamma] + additional_probability

		list_new_curr = {}

		for gamma in list_curr.keys():
			if list_curr[gamma] > p:
				list_new_curr[heys_instance.mix_words(gamma)] = list_curr[gamma]

		list_prev = list_new_curr

	return (alpha, list_prev)


In [13]:
diff_table = {}

with Pool(initializer = init_worker, initargs = (diff_table, )) as pool:
	candidates = list(
		pool.map(
			differential_search,
			[(j << 4 * i) for i in range(4) for j in range(1, 16)]
		)
	)

list(candidates)


[(1, {34824: 0.0009918212890625}),
 (2, {}),
 (3,
  {8: 0.0009002685546875,
   32768: 0.000980377197265625,
   32904: 0.0016937255859375,
   34816: 0.00096893310546875,
   34952: 0.00090789794921875,
   2176: 0.00090789794921875,
   34824: 0.001514434814453125}),
 (4, {32904: 0.00098419189453125, 34824: 0.000820159912109375}),
 (5, {}),
 (6,
  {32768: 0.0008697509765625,
   32904: 0.001117706298828125,
   34824: 0.00112152099609375}),
 (7, {}),
 (8, {}),
 (9, {32904: 0.001049041748046875, 34824: 0.000988006591796875}),
 (10, {}),
 (11, {32768: 0.0008544921875}),
 (12, {}),
 (13,
  {32768: 0.0009307861328125,
   34816: 0.000858306884765625,
   34824: 0.000988006591796875}),
 (14, {34824: 0.000858306884765625}),
 (15,
  {32768: 0.00084686279296875,
   32904: 0.00109100341796875,
   34824: 0.0009918212890625}),
 (16, {}),
 (32, {}),
 (48,
  {4113: 0.0010547637939453125,
   8226: 0.001239776611328125,
   8706: 0.0009374618530273438}),
 (64, {}),
 (80, {}),
 (96, {8226: 0.000803947448730468

In [14]:
differentials = []

for alpha, betas in list(candidates):
	for beta in betas.keys():
		differentials.append(((alpha, beta), betas[beta]))

sorted_differentials = sorted(differentials, key = lambda x: x[1], reverse = True)

sorted_differentials


[((3, 32904), 0.0016937255859375),
 ((3, 34824), 0.001514434814453125),
 ((48, 8226), 0.001239776611328125),
 ((6, 34824), 0.00112152099609375),
 ((6, 32904), 0.001117706298828125),
 ((15, 32904), 0.00109100341796875),
 ((48, 4113), 0.0010547637939453125),
 ((9, 32904), 0.001049041748046875),
 ((1, 34824), 0.0009918212890625),
 ((15, 34824), 0.0009918212890625),
 ((9, 34824), 0.000988006591796875),
 ((13, 34824), 0.000988006591796875),
 ((4, 32904), 0.00098419189453125),
 ((3, 32768), 0.000980377197265625),
 ((3, 34816), 0.00096893310546875),
 ((768, 16452), 0.0009377002716064453),
 ((48, 8706), 0.0009374618530273438),
 ((13, 32768), 0.0009307861328125),
 ((3, 34952), 0.00090789794921875),
 ((3, 2176), 0.00090789794921875),
 ((768, 32904), 0.0009059906005859375),
 ((3, 8), 0.0009002685546875),
 ((6, 32768), 0.0008697509765625),
 ((13, 34816), 0.000858306884765625),
 ((14, 34824), 0.000858306884765625),
 ((11, 32768), 0.0008544921875),
 ((15, 32768), 0.00084686279296875),
 ((4, 34824), 

In [15]:
def write_texts(file, texts):
	with open(file, 'wb') as f:
		for text in texts:
			f.write(text.to_bytes(2, 'little'))


In [16]:
def read_texts(file):
	texts = []

	with open(file, 'rb') as f:
		line = f.read()
		concatenated_text = int.from_bytes(line, byteorder='little')

		for i in range(len(line) // 2):
			texts.append((concatenated_text >> (i * 16)) & (list_size - 1))

	return texts


In [17]:
def get_input_file(index, version):
	return f'./inputs/input_{index}_{version}.bin'

def get_output_file(index, version):
	return f'./outputs/output_{index}_{version}.bin'


In [18]:
def create_statistical_materials(alpha, count, index):
	plain_texts = [randrange(1, 65535) for i in range(count)]
	write_texts(get_input_file(index, 'orig'), plain_texts)
	
	mutated_texts = [text ^ alpha for text in plain_texts]
	write_texts(get_input_file(index, 'mut'), mutated_texts)
	
	subprocess.Popen(f'wine ./heys.exe e 02 {get_input_file(index, "orig")} {get_output_file(index, "orig")}', stdin = PIPE, stdout=DEVNULL, shell = True).communicate(timeout = 5)
	subprocess.Popen(f'wine ./heys.exe e 02 {get_input_file(index, "mut")} {get_output_file(index, "mut")}', stdin = PIPE, stdout=DEVNULL, shell = True).communicate(timeout = 5)

	cipher_texts = read_texts(get_output_file(index, 'orig'))
	mutated_cipher_texts = read_texts(get_output_file(index, 'mut'))

	return cipher_texts, mutated_cipher_texts


In [19]:
def calc_beta(c_1, c_2, k):
	y_1 = heys_instance.substitution(heys_instance.mix_words(c_1 ^ k), False)
	y_2 = heys_instance.substitution(heys_instance.mix_words(c_2 ^ k), False)

	return y_1 ^ y_2


In [20]:
def check_key(k, cipher_texts, mutated_cipher_texts, beta):
	success_count = 0

	for c_1, c_2 in zip(cipher_texts, mutated_cipher_texts):
		possible_beta = calc_beta(c_1, c_2, k)

		if beta == possible_beta:
			success_count += 1

	return success_count


In [26]:
%%time

keys = range(list_size)

for index, (differential, probability) in enumerate(sorted_differentials):
	alpha, beta = differential

	cipher_texts, mutated_cipher_texts = create_statistical_materials(alpha, 6500, index)

	with Pool() as pool:
		keys_frequencies = list(pool.map(lambda key: check_key(key, cipher_texts, mutated_cipher_texts, beta), keys))

	max_frequency = max(keys_frequencies)

	new_keys = []

	for key_index, frequency in enumerate(keys_frequencies):
		if frequency == max_frequency:
			new_keys.append(keys[key_index])

	print('Iteration', index)
	print('Max frequency', max_frequency)
	print('Keys with max frequency', new_keys)
	print('---------------------------')

	keys = new_keys

	if (len(keys) == 1):
		break


Iteration 0
Max frequency 3
Keys with max frequency [314, 318, 378, 382, 442, 446, 506, 510, 1338, 1342, 1402, 1406, 1466, 1470, 1530, 1534, 16698, 16702, 16762, 16766, 16826, 16830, 16890, 16894, 17722, 17726, 17786, 17790, 17850, 17854, 17914, 17918, 33202, 33206, 33266, 33270, 34226, 34230, 34290, 34294, 49586, 49590, 49650, 49654, 50610, 50614, 50674, 50678]
---------------------------
Iteration 1
Max frequency 1
Keys with max frequency [446, 510, 1470, 16826, 16894, 17914]
---------------------------
Iteration 2
Max frequency 2
Keys with max frequency [446, 510, 1470, 16826, 16894, 17914]
---------------------------
Iteration 3
Max frequency 1
Keys with max frequency [446]
---------------------------
CPU times: user 706 ms, sys: 175 ms, total: 881 ms
Wall time: 7min 32s
