# AD18 Quantum Symmetric Cryptosystem
This notebook contains an implementation of Amerimehr and Dehkordi's symmetric quantum cryptosystem proposed in 2018. This system uses a combination of classical encryption, authenticated hashing, and error correction in combination with secret basis information to provide confidentiality, integrity, non-repudiation, and high security against undetected eavesdropping. The cryptosystem can be briefly described as follows:

### Encryption: 
* Step 1: Alice computes x := mGk obtaining the messagebits, and prepares a train of qubits according to x called Qx. She uses random bases for preparing the qubits.
* Step 2: Alice computes h := hk(m), and prepares a train of qubits according to h called Qh. The base string {B1,B2, . . .,Br}, used by Alice to prepare Qh, is considered according to g(x). The rule is that Bj = BZ if the j-th bit of g(x) is zero, and Bj = BX otherwise.
* Step 3: Alice mixes Qx and Qh together according to f(k) obtaining Qy, and then sends Qy to Bob.

### Decryption:
* Step 1: Bob receives Qy, and extracts Qx and Qh according to f(k). Then he measures all qubits of Qx in BY basis obtaining x, and stores Qh.
* Step 2: Bob computes m := x  k, and runs a decoding algorithm to obtain the message m. (Note that m = mGe, where e is an error vector arised from Bob’s measurement). If the error rate is lower than the threshold, he obtains m. Otherwise, he rejects the communication.
* Step 3: Bob first measures the qubits of Qh in the the correct bases according to g(x) obtaining h. Then he computes h. If h = h, Bob concludes the message m is valid, and Alice is legal decisively. Otherwise, Bob does not validate m (if the quantum channel is noisy d(h, h)  r implies Alice’s legitimacy, where  is the QBER1 of the channel)

**Source**: A. Amerimehr and M. H. Dehkordi. 2018. Quantum Symmetric Cryptosystem Based on Algebraic Codes. IEEE Communications Letters 22, 9 (September 2018), 1746–1749. DOI:https://doi.org/10.1109/LCOMM.2018.2844245

To execute tests of this cryptosystem, run the 2nd and 3rd cells below to import data and define all of the needed functions. Then, create a preshared key by running the command `preshared_key = bb84_qkd(128, 16)`, and define a message by running `message = b'yourMessageHere'`. After that, the system can be executed by running `ad_2018(message, preshared_key)`. For additional debugging information as the process runs, set `debug = True` first. For convenience, this code has been placed in the below cell and can be run after executing the imports and function definitions.

In [14]:
# IMPORTANT! Run me after running the two cells below!!!
preshared_key = bb84_qkd(128, 16)
message = b'cybr8950'
debug = False
success = ad_2018(message, preshared_key)
print('Did transmission succeed? %s' % success)

Did transmission succeed? True


In [1]:
# Run me first!
import numpy as np
from numpy import array, inf
from numpy.random import randint, randn, seed
from math import pi
from qiskit import(QuantumCircuit, execute, Aer, IBMQ, assemble, transpile)
from qiskit.visualization import plot_histogram, plot_bloch_multivector
from qiskit.providers.ibmq.job import job_monitor
from commpy.channelcoding.convcode import Trellis, conv_encode, viterbi_decode
import hashlib, hmac
import secrets
from Crypto.Cipher import Salsa20

print("Imports Successful")
# Uncomment if we need to use the actual quantum backends
# provider = IBMQ.load_account()
# provider.backends()

Imports Successful


In [2]:
# Run me second!
# Helper functions for BB84 QKD - partially copied from https://qiskit.org/textbook/ch-algorithms/quantum-key-distribution.html
def encode_message(bits, bases):
    message = []
    for i in range(len(bases)):
        qc = QuantumCircuit(1,1)
        if bases[i] == 0: # Prepare qubit in Z-basis
            if bits[i] == 0:
                pass 
            else:
                qc.x(0)
        else: # Prepare qubit in X-basis
            if bits[i] == 0:
                qc.h(0)
            else:
                qc.x(0)
                qc.h(0)
        qc.barrier()
        message.append(qc)
    return message

def measure_message(message, bases):
    backend = Aer.get_backend('qasm_simulator')
    measurements = []
    for q in range(len(bases)):
        if bases[q] == 0: # measuring in Z-basis
            message[q].measure(0,0)
        if bases[q] == 1: # measuring in X-basis
            message[q].h(0)
            message[q].measure(0,0)
        qasm_sim = Aer.get_backend('qasm_simulator')
        qobj = assemble(message[q], shots=1, memory=True)
        result = qasm_sim.run(qobj).result()
        measured_bit = int(result.get_memory()[0])
        measurements.append(measured_bit)
    return measurements

def remove_garbage(a_bases, b_bases, bits):
    good_bits = []
    for q in range(len(a_bases)):
        if a_bases[q] == b_bases[q]:
            # If both used the same basis, add
            # this to the list of 'good' bits
            good_bits.append(bits[q])
    return good_bits

def sample_bits(bits, selection):
    sample = []
    for i in selection:
        # use np.mod to make sure the
        # bit we sample is always in 
        # the list range
        i = np.mod(i, len(bits))
        # pop(i) removes the element of the
        # list at index 'i'
        sample.append(bits.pop(i))
    return sample

# Original helper functions
def bits_to_byte_array(bits):
    if ((len(bits) % 8) != 0):
        raise RuntimeError('Bits length must be a multiple of 8')
    byte_str = "".join(str(bit) for bit in bits)
    return int(byte_str, 2).to_bytes(len(byte_str) // 8, byteorder='big')

def byte_array_to_bits(byte_arr):
    bits = []
    for i in byte_arr:
        binary = bin(i)[2:].zfill(8)
        for b in binary:
            bits.append(int(b))
    
    return bits

# Simplified BB84 key exchange - call one method and it produces a byte key of the desired length
# Example: exchanged_key = bb84_qkd(128, 16)
def bb84_qkd(desired_key_length, eavesdropper_detection_sample_size):
    if ((desired_key_length % 8) != 0):
        raise RuntimeError('Key length must be a multiple of 8')
        
    n = (desired_key_length * 2) + (eavesdropper_detection_sample_size * 6) 
    bob_key = []
    
    while (len(bob_key) < desired_key_length):
        alice_bits = randint(2, size=n)
        alice_bases = randint(2, size=n)

        # Pretend the message is transmitted here
        message = encode_message(alice_bits, alice_bases)

        bob_bases = randint(2, size=n)
        bob_results = measure_message(message, bob_bases)

        # Pretend Alice and Bob's bases were shared here
        alice_key = remove_garbage(alice_bases, bob_bases, alice_bits)
        bob_key = remove_garbage(alice_bases, bob_bases, bob_results)

        # Pretend the selection of random sample bits was shared here
        bit_selection = randint(n, size=eavesdropper_detection_sample_size)
        
        bob_sample = sample_bits(bob_key, bit_selection)
        alice_sample = sample_bits(alice_key, bit_selection)

        if (bob_sample != alice_sample):
            print("Eavesdropper detected! Reattempt key distribution on a different quantum channel.")
            raise RuntimeError
    return bits_to_byte_array(bob_key[0:desired_key_length])


# New methods created for AD18 implementation begin here
def calculate_hash_bases(key, length):
    # Secure option: Produce <length> bits of data using the key and a fixed value with some cipher.
    # Simple option: Just use the key directly and loop through it until length is reached using key bits as 0 = +, 1 = X.
    hash_bases = []
    bit_key = byte_array_to_bits(key)
    for i in range(length):
        hash_bases.append(int(bit_key[i % len(bit_key)]))
    return hash_bases

def mix_message_and_hash(key, message, hashed):
    # Simplest option: Just concatenate the two, hash + message or alternate or something
    # Paper option: Uses only one hash bit and calculates sum(ki * 2^i mod n) for 0..n to determine its position
    # Secure option: Produce values using a secure cipher and the key, then mix the message and hash accordingly
    mixed = []
    for x in hashed:
        mixed.append(x)
    for x in message:
        mixed.append(x)
    return mixed

def unmix_message_and_hash(key, mixed, hash_length):
    return (mixed[0:hash_length], mixed[hash_length:])

# Bob uses a special measurement function to measure in the omega basis
def bob_measure_message(message):
    backend = Aer.get_backend('qasm_simulator')
    measurements = []
    for q in range(len(message)):
        message[q].rz(-pi/8,0)
        message[q].measure(0,0)
        qasm_sim = Aer.get_backend('qasm_simulator')
        qobj = assemble(message[q], shots=1, memory=True)
        result = qasm_sim.run(qobj).result()
        measured_bit = int(result.get_memory()[0])
        measurements.append(measured_bit)
    return measurements

# Properties for convolutional ECC - Reference https://commpy.readthedocs.io/en/latest/channelcoding.html
x = 3
memory = array([x])
g_matrix = array([[1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')
debug = False

def convolutional_ecc_encode(message):
    msg_bits = byte_array_to_bits(message)
    coded_bits = conv_encode(msg_bits, ecc_trellis)
    return coded_bits
                        
def convolutional_ecc_decode(coded_bits):                    
    decoded_bits = viterbi_decode(coded_bits.astype(float), ecc_trellis)
    decoded_msg = bits_to_byte_array(decoded_bits[:-x])
    return decoded_msg
    
    
def ad_2018(message, preshared_key):
    # Preparation
    # Salsa20 is a stream cipher which will basically take the short preshared key and expand it to a long one-time-pad equivalent for XORing
    cipher = Salsa20.new(key=preshared_key) 
    
    # This 8-byte nonce must be shared with Bob
    nonce = cipher.nonce
    encrypted = cipher.encrypt(message) 

    # Alice adds ECC to her message using a convolutional code
    alice_message = convolutional_ecc_encode(encrypted)

    # Alice prepares a keyed hash of her message
    hash_key = secrets.token_bytes(16) # Or bb84_qkd(128,16) or other protocol
    hash_function = hmac.new(hash_key, message, digestmod='md5') # Not a secure hash function but used here as it's a short hash
    alice_hash = hash_function.digest()

    if debug: 
        print("Alice message: %s" % alice_message)
        print("ECC Coded length: %i" % len(alice_message))
        print("Alice hash: %s" % alice_hash)

    # Sender preparation
    # Alice mixes the bits of her message and hash according to some function f(k) based on the preshared key
    # then transmits message bits in random basis and hash bits in a basis given by g(k) based on the preshared key
    alice_message_bases = randint(2, size=len(alice_message))
    alice_hash_bases = calculate_hash_bases(preshared_key, len(alice_hash) * 8)

    alice_transmission_bases = mix_message_and_hash(preshared_key, alice_message_bases, alice_hash_bases)
    alice_transmission_bits = mix_message_and_hash(preshared_key, alice_message, byte_array_to_bits(alice_hash))
    
    if debug:
        print("Alice transmission bases: %s" % alice_transmission_bases)
        print("Alice transmission bits: %s" % alice_transmission_bits)

    encoded_message = encode_message(alice_transmission_bits, alice_transmission_bases)
    
    # Transmission does not require any special code in this protocol
    
    # Recipient processing
    unmixed = unmix_message_and_hash(preshared_key, encoded_message, len(alice_hash)*8)
    hash_qubits = unmixed[0]
    message_qubits = unmixed[1]
    bob_received_message = bob_measure_message(message_qubits)

    bob_hash_bases = calculate_hash_bases(preshared_key, len(alice_hash) * 8)
    bob_received_hash = bits_to_byte_array(measure_message(hash_qubits, bob_hash_bases))

    boblen = len(bob_received_message)
    for i in range(boblen-x, boblen):
        bob_received_message[i] = 0
    if debug: print(array(bob_received_message))

    ecc_decoded = convolutional_ecc_decode(array(bob_received_message))
    cipher = Salsa20.new(preshared_key, nonce=nonce)
    decrypted = cipher.decrypt(ecc_decoded)
    
    if debug: 
        print("ECC Decoded value: %s" % ecc_decoded)
        print("Decrypted value: %s" % decrypted)

    hash_function = hmac.new(hash_key, decrypted, digestmod='md5')
    bob_hash = hash_function.digest()

    match = hmac.compare_digest(bob_hash, bob_received_hash)
    if debug:
        print("Bob calcuated hash: %s" % bob_hash)
        print("Bob received hash: %s" % bob_received_hash)
        print("Is message verified? %s" % match)
        
    return match

print('Functions and shared data defined successfully')

Functions and shared data defined successfully


# Analysis of AD18
We define a simple function that executes our ad18 implementation some number of times and reports the total number of successes vs failures. By modifying parameters between runs we can evaluate the overall efficiency of the function for different messages and ECC configurations. 

In [100]:
def analyze_ad2018(count, message):
    preshared_key = bb84_qkd(128, 16)
    success = 0
    for i in range(count):
        if (ad_2018(message, preshared_key)):
            success += 1
            # print('success')
            
    print('%i/%i success rate' % (success, count))

In [78]:
# A smaller ECC trellis with 4 elements in the matrix produces a 25% chance of success on a 3 character message
# 247/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[7, 5, 3, 1]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

247/1000 success rate


In [79]:
# Increasing the matrix to 5 elements increases success by 8% on this message
# 334/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

334/1000 success rate


In [80]:
# Keeping the same matrix but increasing the memory has no significant impact
# 234/1000 success rate
x = 5
memory = array([x])
g_matrix = array([[7, 5, 3, 1]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

234/1000 success rate


In [81]:
# Changing elements of the matrix also produces no significant impact
x = 3
memory = array([x])
g_matrix = array([[7, 5, 3, 2]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

251/1000 success rate


In [82]:
# Continuing to expand the matrix increases the success rate
# 477/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5, 1]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

477/1000 success rate


In [84]:
x = 3
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5, 1, 3, 7, 6, 2]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

815/1000 success rate


In [85]:
# With larger matrices we see a reasonable 80%+ success rate
# 823/1000 success rate
x = 5
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5, 1, 3, 7, 6, 2]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

823/1000 success rate


In [86]:
# Changing the elements may have some impact
# 844/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5, 1, 3, 7, 3, 1]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

844/1000 success rate


In [94]:
# Testing with the much longer Hello World reduces success but still has a >50% success rate
# 582/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[5, 7, 1, 3, 5, 1, 3, 7, 3, 1, 9, 5]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hello world')

582/1000 success rate


In [104]:
# With this G-Matrix we see a 90% success rate for 3 characters
# 900/1000 success rate
x = 3
memory = array([x])
g_matrix = array([[1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hey')

900/1000 success rate


In [97]:
# And an 84% success rate for 5 characters
x = 3
memory = array([x])
g_matrix = array([[1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'hello')

843/1000 success rate


In [103]:
# Takes time to run, but by doing so we can see a roughly 2.5% decrease in efficiency per character transmitted
x = 3
memory = array([x])
g_matrix = array([[1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'1')
analyze_ad2018(1000, b'12')
analyze_ad2018(1000, b'123')
analyze_ad2018(1000, b'1234')
analyze_ad2018(1000, b'12345')
analyze_ad2018(1000, b'123456')
analyze_ad2018(1000, b'1234567')
analyze_ad2018(1000, b'12345678')

955/1000 success rate
926/1000 success rate
902/1000 success rate
853/1000 success rate
821/1000 success rate
800/1000 success rate
780/1000 success rate
756/1000 success rate


In [106]:
# In contrast to the above, we can see a dramatic decrease in successful transmission with the shorter convolutional code trellis
x = 3
memory = array([x])
g_matrix = array([[5, 7]])
ecc_trellis = Trellis(memory, g_matrix, code_type='default')

analyze_ad2018(1000, b'1')
analyze_ad2018(1000, b'12')
analyze_ad2018(1000, b'123')

262/1000 success rate
58/1000 success rate
9/1000 success rate
