# Smart meters using Semi Quantum Key Distribution

In [6]:
from qiskit import *
import math
import numpy as np
import hashlib
import hmac
import time
import json

In [7]:
simulator = Aer.get_backend('qasm_simulator')

Variable definition

In [8]:
# size of INFO BITS for sqkd
n = 4

# delta parameter for sqkd
delta = 1/8

# (Z-)error threshold for CTRL for sqkd
p_ctrl = 0.5

# (Z-)error threshold for SIFT for sqkd
p_test = 0.5

# path to control centre and nan database
cc_nan_db_path = './database/control_center_nan_database.txt'

# path to control centre and smart meter database
cc_sm_db_path = './database/control_center_smart_meter_database.txt'

## File handling

## Helper functions

Function to add NAN gateway

In [9]:
def addNewNAN(NanName, Inan, SharedKey):
    '''
    NanName(string): Name of NAN gateway
    Inan (string): Identity of NAN
    SharedKey(string): Shared secret key
    '''
    # Check if both parameters are passed
    if Inan == None or SharedKey == None:
        print(f"NAN addition failed:{Inan=}:{SharedKey=}")
        return False
    
    # Check if NAN identity exists
    with open(cc_nan_db_path, "r") as file:
        for row in file.read().split('\n')[:-1]:
            [_Inan, _SharedKey, _NanName] = row.split(':')
            if _Inan == Inan:
                print(f"NAN addition failed:{Inan=} already exists")
    
    # Add new NAN

    with open(cc_nan_db_path, "a") as file:
        file.write(f'{Inan}:{SharedKey}:{NanName}\n')
    return True

Function to add Smart Meter

In [10]:
def addNewSM(SMName, Ism, SharedKey):
    '''
    SMName(string): Name of SM
    Ism (string): Identity of SM
    SharedKey(string): Shared secret key
    '''
    # Check if both parameters are passed
    if Ism == None or SharedKey == None:
        print(f"SM addition failed:{Ism=}:{SharedKey=}")
        return False
    
    # Check if SM identity exists
    with open(cc_sm_db_path, "r") as file:
        for row in file.read().split('\n')[:-1]:
            [_Ism, _Iencsm, _SharedKey, _SMName] = row.split(':')
            if _Ism == Ism:
                print(f"SM addition failed:{Ism=} already exists")
    
    # Generate Iencsm

    biometric = np.random.randint(2, size=len(Ism))

    Iencsm = ""

    for i in range(len(Ism)):
        Iencsm += str(int(biometric[i])^(not int(Ism[i])))

    # Add new SM

    with open(cc_sm_db_path, "a") as file:
        file.write(f'{Ism}:{Iencsm}:{SharedKey}:{SMName}\n')
    
    return True

Function to get data of Smart Meter

In [11]:
def getSM(SMname):
    '''
    SMname(str): name of smart meter
    '''

    with open(cc_sm_db_path, "r") as file:
        for row in file.read().split('\n')[:-1]:
            [_Ism, _Iencsm, _SharedKey, _SMName] = row.split(':')
            if _SMName == SMname:
                return (_Ism, _Iencsm, _SharedKey, _SMName)
    print("Failed to get SM")
    return (None, None, None, None)

Function to get data of NAN gateway

In [12]:
def getNAN(NANname):
    '''
    NANname(str): name of NAN gateway
    '''
    with open(cc_nan_db_path, "r") as file:
        for row in file.read().split('\n')[:-1]:
            [_Inan,  _SharedKey, _NANname] = row.split(':')
            if _NANname == NANname:
                return (_Inan, _SharedKey, _NANname)
    print("Failed to get NAN")
    return (None, None, None)

## Phase 1

In [13]:
# size of INFO BITS for sqkd
n = 4

# delta parameter for sqkd
delta = 1/8

# (Z-)error threshold for CTRL for sqkd
p_ctrl = 0.5

# (Z-)error threshold for SIFT for sqkd
p_test = 0.5

Function for encoding bits in 0/1 or +/- state for sqkd

In [14]:
def encodeSQKD(basis, message_bit, circuit, idx):
	'''
	basis(int): defines what basis is the message encode. 0 (Computational) and 1 (Hadamard)
	message_bit (int):
    0 is encoded as |0> or |+>
    1 is encoded as |1> or |->
    circuit(QuantumCircuit): the circuit used for communication
    idx(int): index of the message_bit
	'''
	if message_bit == 1:
		circuit.x(idx)
	if basis == 1:
		circuit.h(idx)
	return circuit

Function for performing SQKD dnd registering new device

In [15]:
def sqkd(device, devicename):
    '''
    device(str): "SM" or "NAN"
    devicename(str): name of device

    Saves data in database if SQKD is successfull
    
    return bool: True if SQKD is successfull else False
    '''

    # number of bits to be set to device
    N = math.ceil(8*n*(1+delta))

    # generate a message for sqkd (binary array)
    message = np.random.randint(2, size=N)
    print(f'{message=}')

    # generate basis for encoding qubits by control center (quantum)
    cc_basis = np.random.randint(2, size=N)
    print(f'{cc_basis=}')

    # circuit for sending data to device
    circuit = QuantumCircuit(N, N)

    for (idx, (basis, message_bit)) in enumerate(zip(cc_basis, message)):
        circuit = encodeSQKD(basis, message_bit, circuit, idx)

    circuit.barrier()

    # device basis for SIFT or CTRL
    device_decisions = np.random.randint(2, size=N)
    print(f'{device_decisions=}')

    # get the reflected (CTRL) and measured (SIFT) qubits

    ctrl_qubits = []

    for (idx, decesion) in enumerate(device_decisions):
        if decesion == 0:
            ctrl_qubits.append(idx)

    print(f'{ctrl_qubits=}')
    sift_qubits = []

    for (idx, decesion) in enumerate(device_decisions):
        if decesion == 1:
            sift_qubits.append(idx)

    print(f'{sift_qubits=}')

    z_sift = [sift_qubits[i] for i in range(len(sift_qubits)) if cc_basis[sift_qubits[i]] == 0]

    print(f'{z_sift=}')
    
    # SIFT by device

    for idx in sift_qubits:
        circuit.measure(idx, idx)
    
    # measurement of sifted qubits by device

    result_device = execute(circuit, backend=simulator, shots=1).result()
    count_device = result_device.get_counts(circuit)

    device_measured_value = list(count_device.keys())[0][::-1]
    
    circuit.barrier()


    print(f'{device_measured_value=}')

    ######### PENDING ####################
    # reorder the reflected (CTRL) qubits

    ###################################

    # measurement of reflected qubits by control centre

    for idx in ctrl_qubits:
        if cc_basis[idx] == 1:
            circuit.h(idx)
        circuit.measure(idx, idx)

    result_ctrl = execute(circuit, backend=simulator, shots=1).result()
    count_ctrl = result_ctrl.get_counts(circuit)

    ctrl_measured_value = list(count_ctrl.keys())[0][::-1]

    print(f'{ctrl_measured_value=}')

    # calculating error for ctrl

    z_error_ctrl = 0
    x_error_ctrl = 0
    len_z = 0
    len_x = 0

    for idx in ctrl_qubits:
        if cc_basis[idx] == 0:
            if message[idx] != int(ctrl_measured_value[idx]):
                print("z")
                print(idx)
                z_error_ctrl += 1
            len_z += 1
        elif cc_basis[idx] == 1:
            if message[idx] != int(ctrl_measured_value[idx]):
                print("x")
                print(idx)
                x_error_ctrl += 1
            len_x += 1

    print(f'{z_error_ctrl/len_z=}')
    print(f'{x_error_ctrl/len_x=}')

    if z_error_ctrl/len_z < p_ctrl and x_error_ctrl/len_x < p_ctrl:

        # select n random sift bits in z basis as test bits

        test_bits = set()
        while len(test_bits) < n:
            test_bits = set(np.random.choice(z_sift, size=n))
        
        print(f'{test_bits=}')

        # defining remaning string

        v = []

        for idx in z_sift:
            if idx not in test_bits:
                v.append(idx)

        print(f'{v=}')
        # calculating z error for sift

        z_error_test = 0

        for idx in test_bits:
            if int(device_measured_value[idx]) != int(ctrl_measured_value[idx]):
                z_error_test += 1

        print(f'{z_error_test/len(test_bits)=}')

        if z_error_test/len(test_bits) < p_test and len(v) >= n:

            info_bits = v[:n]

            info = ""

            for idx in info_bits:
                info += device_measured_value[idx]

            print(f'{info=}')

            # Apply hash to function to info
            sk = hashlib.sha512(info.encode('utf-8')).hexdigest()

            print(f'{sk=}')

            ######## SAVE IN FILE ###############

            # Generate Identity

            temp_Ix = np.random.randint(2, size=n)

            Ix = ""

            for i in range(len(temp_Ix)):
                Ix += str(temp_Ix[i])

            if device == "SM":
                if addNewSM(devicename, Ix, info):
                    return True
            if device == "NAN":
                if addNewNAN(devicename, Ix, info):
                    return True

    print("SQKD FAILED")
    return False

## Phase 2

Function to generate PreAuthReqSM

In [16]:
def genPreAuthReqSM(SMname):
    '''
    SMname(str): name of smart meter
    '''

    [Ism, Iencsm, SharedKey, _SMName] = getSM(SMname)

    if Ism != None and Iencsm != None and SharedKey != None:
        PreAuthReqSM = {
            "HMAC": hmac.new(SharedKey.encode("utf-8"), Iencsm.encode("utf-8"), hashlib.sha256).hexdigest(),
            "Iencsm": Iencsm,
            "TS": time.time(),
            "SharedKey": SharedKey
        }
        return PreAuthReqSM
    print("Failed to create PreAuthReqSM")
    return None

Function to generate PreAuthReqNAN

In [17]:
def genPreAuthReqNAN(SMname, NANname):
    '''
    SMname(str): name of smart meter
    NANname(str): name of NAN gateway
    '''

    PreAuthReqSM = genPreAuthReqSM(SMname)
    [Inan,  SharedKey, _NANname] = getNAN(NANname)

    if Inan != None and SharedKey != None and PreAuthReqSM != None:
        PreAuthReqNAN = {
            "PreAuthReqSM": PreAuthReqSM,
            "HMAC": hmac.new(SharedKey.encode("utf-8"), Inan.encode("utf-8"), hashlib.sha256).hexdigest(),
            "Inan": Inan,
            "TS": time.time(),
            "SharedKey": SharedKey
        }
        return PreAuthReqNAN
    print("Failed to create PreAuthReqNAN")
    return None

Semi-Quantum Mutual Identity Authentication Using Bell States

$$ |\varphi^{\pm}> = \frac{1}{\sqrt{2}}(|00>\pm|11>) (0, 1)$$
$$ |\phi^{\pm}> = \frac{1}{\sqrt{2}}(|01>\pm|10>) (2, 3)$$

Functon to create bell states for authentication

In [18]:
def createBellStates(state_number, qubit1, qubit2, circuit):
    '''
    state_number (int): bell state based on above cell
    qubit1 (int), qubit2 (int): indices of entangled qubits
    circuit (QuantumCircuit): circuit under consideration

    return QuantumCircuit: updated quantum circuit
    '''
    if state_number == 0:
        pass
    elif state_number == 1:
        circuit.x(qubit1)
    elif state_number == 2:
        circuit.x(qubit2)
    elif state_number == 3:
        circuit.x(qubit1)
        circuit.x(qubit2)
    circuit.h(qubit1)
    circuit.cnot(qubit1, qubit2)
    return circuit

Function to create decoy qubits based on secret key

In [19]:
def createDecoyQubits(key, bit, idx, circuit):
    '''
    key (str): securely shared key
    bit (int): 0 or 1
    idx (int): index of the decoy bit
    circuit (QuantumCircuit): circuit under consideration

    return QuantumCircuit: updated quantum circuit
    '''
    if bit:
        circuit.x(idx)
    if key[2*idx:2*idx+2] in {"10", "11"}:
        circuit.h(idx)
    return circuit

Function for semi quantum authentication

In [1]:
def sq_auth(key):
    '''
    key (string): key of the device to be authenticated

    return (bool bool): (is classical entity legal, is quantum entity legal)
    '''

    print(f'{key=}')

    n_prime = n//2  # since size of the key in paper is 2*n
    # we define n' as 2*n' = n

    # STEP 1: Preparation

    circuit = QuantumCircuit(n_prime*2, n_prime*2)

    # Even bits = S_H
    # Odd bits = S_T

    initial_bell_state_number = list(np.random.randint(0, 4, size=(n_prime)))
    print(f'{initial_bell_state_number=}')

    # n_prime*2 for n bell states

    # generating random n' bell states
    for i in range(n_prime):
        circuit = createBellStates(initial_bell_state_number[i], 2*i, 2*i+1, circuit)

    # STEP 2: Eavesdropping detection

    ## generating n'/2 decoy qubits

    decoy_circuit = QuantumCircuit(n_prime//2, n_prime//2)

    decoy_init = np.random.randint(0, 2, size=(n_prime//2))

    print(f'{decoy_init=}')

    for i in range(n_prime//2):
        decoy_circuit = createDecoyQubits(key, decoy_init[i], i, decoy_circuit)

    ###### Pending ######
    # reordering
    #####################

    for i in range(n_prime//2):
        if key[2*i:2*i+2] in {"10", "11"}:
            decoy_circuit.h(i)
        decoy_circuit.measure(i, i)
    
    decoy_result = execute(decoy_circuit, backend=simulator, shots=1).result()
    decoy_count = decoy_result.get_counts(decoy_circuit)
    decoy_measured_values = [int(b) for b in list(decoy_count.keys())[0][::-1]]

    print(f'{decoy_measured_values=}')

    for i in range(n_prime//2):
        if decoy_measured_values[i] != decoy_init[i]:
            print("AUTH FAILED")
            return (False, False)

    # STEP 3: Measurement

    for i in range(n_prime*2):
        circuit.measure(i, i)

    result = execute(circuit, backend=simulator, shots=1).result()
    count = result.get_counts(circuit)
    measured_values = [int(b) for b in list(count.keys())[0][::-1]]

    # STEP 4: XOR operation

    ## Create RB

    RB = []

    for i in range(1, n, 2):
        RB.append(measured_values[i])
    print(f'{RB=}')
    
    ## Create IA

    RA = []

    for i in range(0, n, 2):
        RA.append(measured_values[i])
    print(f'{RA=}')

    ## Create RA*

    RAstar = []

    for i in range(n_prime):
        if RB[i] == 0:
            if initial_bell_state_number[i] in {0, 1}:
                RAstar.append(0)
            else:
                RAstar.append(1)
        elif RB[i] == 1:
            if initial_bell_state_number[i]  in {0, 1}:
                RAstar.append(1)
            else:
                RAstar.append(0)
    print(f'{RAstar=}')


    ## Create RB*

    RBstar = []

    for i in range(n_prime):
        if RA[i] == 0:
            if initial_bell_state_number[i]  in {0, 1}:
                RBstar.append(0)
            else:
                RBstar.append(1)
        elif RA[i] == 1:
            if initial_bell_state_number[i]  in {0, 1}:
                RBstar.append(1)
            else:
                RBstar.append(0)
    print(f'{RBstar=}')

    ## Create IA, IAstar, IB, IBstar
    IA = []
    IB = []
    IAstar = []
    IBstar = []

    for i in range(n_prime):
        IA.append(RA[i]^int(key[i]))
        IB.append(RB[i]^int(key[i]))
        IAstar.append(RAstar[i]^int(key[i]))
        IBstar.append(RBstar[i]^int(key[i]))

    print(f'{IA=}')
    print(f'{IB=}')
    print(f'{IAstar=}')
    print(f'{IBstar=}')

    # STEP 5: Authentication

    classical_auth, quantum_auth = 1, 1

    for i in range(n_prime):
        if IA[i] != IAstar[i]:
            quantum_auth = 0
        if IB[i] != IBstar[i]:
            classical_auth = 0
    
    print(f'{measured_values=}')

    # print(circuit.draw())
    # print(decoy_circuit.draw())

    print(f'{(classical_auth, quantum_auth)=}')

    return (classical_auth, quantum_auth)

Function for comparing HMAC of NAN

In [None]:
def verifyNANHMAC(hmacValue, NANname):
    '''
    hmacValue (string): value of hmac
    NANname (string): name of NAN to be verified
    '''
    _Inan, _SharedKey, _NANname = getNAN(NANname)
    if _SharedKey != None and _Inan != None:
        return hmac.new(_SharedKey.encode("utf-8"), _Inan.encode("utf-8"), hashlib.sha256).hexdigest() == hmacValue
    print("Verify NAN HMAC Failed")
    return False

Function for comparing HMAC of SM

In [None]:
def verifySMHMAC(hmacValue, SMname):
    '''
    hmacValue (string): value of hmac
    SMname (string): name of SM to be verified
    '''
    _Ism, _Iencsm, _SharedKey, _SMName = getSM(SMname)
    if _SharedKey != None and _Iencsm != None:
        return hmac.new(_SharedKey.encode("utf-8"), _Iencsm.encode("utf-8"), hashlib.sha256).hexdigest() == hmacValue
    print("Verify SM HMAC Failed")
    return False

Function for authenticating SM

In [21]:
def authSM(sm, nan):
    '''
    sm (string): name of smart meter to be authenticated
    nan (string): name of parent NAN gateway
    '''

    # getting PreAuthReq Payloads
    PreAuthReqNAN = genPreAuthReqNAN(sm, nan)
    PreAuthReqSM = PreAuthReqNAN["PreAuthReqSM"]
    
    # verifying HMAC of NAN

    if verifyNANHMAC(PreAuthReqNAN["SharedKey"], nan):

        # authenticating NAN gateway
        cc_auth, nan_auth = sq_auth(PreAuthReqNAN["SharedKey"])
        
        print(f'{cc_auth=}:{nan_auth=}')

        if cc_auth and nan_auth:
            
            # verifying HMAC of sm

            if verifySMHMAC(PreAuthReqSM["SharedKey"], sm):

                # authenticating SM

                cc_auth, sm_auth = sq_auth(PreAuthReqSM["SharedKey"])

                print(f'{cc_auth=}:{sm_auth=}')

                return cc_auth and sm_auth

    print("Auth failed")
    return False

## Phase 4

## Menu based cell

In [34]:
c = None

while c != 0:
    print("0. Exit")
    print("1. Add NAN")
    print("2. Add SM")
    print("3. Auth SM")
    c = int(input("Enter your choice: "))

    if c == 1:
        nanname = input("Enter name of NAN gateway")
        print(sqkd("NAN",nanname))
    if c == 2:
        smname = input("Enter name of Smart Meter")
        print(sqkd("SM",smname))
    if c == 3:
        nanname = input("Enter name of NAN gateway")
        smname = input("Enter name of Smart Meter")
        print(authSM("sm1","nan1"))

0. Exit
1. Add NAN
2. Add SM
3. Auth SM


ValueError: invalid literal for int() with base 10: ''

## Test cells

In [28]:
print(sqkd("SM","sm1"))
print(sqkd("NAN","nan1"))

message=array([0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 1,
       0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0])
cc_basis=array([1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1,
       1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1])
device_decisions=array([0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1,
       0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1])
ctrl_qubits=[0, 9, 10, 17, 20, 22, 30, 31, 33]
sift_qubits=[1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 15, 16, 18, 19, 21, 23, 24, 25, 26, 27, 28, 29, 32, 34, 35]
z_sift=[3, 7, 11, 13, 14, 15, 16, 18, 19, 24, 25]
device_measured_value='010100000001111000110101111010000001'
ctrl_measured_value='001100100101011001111101111000000100'
z_error_ctrl/len_z=0.0
x_error_ctrl/len_x=0.0
test_bits={24, 25, 18, 19}
v=[3, 7, 11, 13, 14, 15, 16]
z_error_test/len(test_bits)=0.0
info='1011'
sk='f97307bf4bdda694ab76060195e90930a91fc333685d50316027969397e387174354a8ce64d3e1443eda41a7d0634cf7f2923171edb6cc0c598b7

In [24]:
# print(json.dumps(genPreAuthReqNAN("sm1","nan1"), indent=10))

In [30]:
# print(authSM("sm1","nan1"))