# Ensuring secure connectivity in smart vehicular to grid technology: An elliptic curve-based authentication key agreement framework


The protocol enables secure **mutual authentication and session key establishment** between an **Electric Vehicle (EV)**, a **Charging Station (CS)**, and a **Service Provider (SP)** using **elliptic curve cryptography**, hashing, and AES-GCM encryption. It ensures confidentiality, integrity, and resistance to impersonation by exchanging cryptographic proofs and fresh random values. The final shared session key allows secure communication and transactions between EV and CS via the SP.

#### Referenced Research Paper:  
[Ensuring secure connectivity in smart vehicular to grid technology: An elliptic curve-based authentication key agreement framework](https://doi.org/10.1016/j.segan.2025.101696)


In [22]:
import secrets
import time
import os
from hashlib import sha256
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from tinyec import registry
from tinyec.ec import Point
import tracemalloc


# Helper functions

def H(data: bytes) -> bytes:
    return sha256(data).digest()

def xor_bytes(a: bytes, b: bytes) -> bytes:
    return bytes(x ^ y for x, y in zip(a, b))

def aes_gcm_encrypt(key: bytes, plaintext: bytes) -> bytes:
    nonce = os.urandom(12)
    encryptor = Cipher(
        algorithms.AES(key),
        modes.GCM(nonce),
        backend=default_backend()
    ).encryptor()
    ciphertext = encryptor.update(plaintext) + encryptor.finalize()
    return nonce + encryptor.tag + ciphertext

def aes_gcm_decrypt(key: bytes, ciphertext: bytes) -> bytes:
    nonce = ciphertext[:12]
    tag = ciphertext[12:28]
    ct = ciphertext[28:]
    decryptor = Cipher(
        algorithms.AES(key),
        modes.GCM(nonce, tag),
        backend=default_backend()
    ).decryptor()
    return decryptor.update(ct) + decryptor.finalize()


# Service Provider

class ServiceProvider:
    def __init__(self, curve_name='secp256r1'):
        self.curve = registry.get_curve(curve_name)
        self.q = self.curve.field.n
        self.g = self.curve.g

        self.x = secrets.randbelow(self.q - 1) + 1
        self.PK_SP = self.x * self.g

        self.y = secrets.randbelow(self.q - 1) + 1
        self.PK_EV_shared = self.y * self.g

        self.hash_function = sha256

        self.registered_users = {}
        self.registered_CS = {}

    def register_EV(self, msg_RU1):
        ER1 = msg_RU1["ER1"]
        TR1 = msg_RU1["TR1"]

        TR2 = int(time.time())
        deltaT = 60

        if abs(TR2 - TR1) > deltaT:
            print("[SP] TR1 timestamp expired.")
            return None

        PK_U_bytes = msg_RU1["PK_U_bytes"]

        key_material = PK_U_bytes + TR1.to_bytes(8, 'big')
        session_key = H(key_material)

        decrypted = aes_gcm_decrypt(session_key, ER1)

        PW_U_len = 32
        ID_len = 32
        a_len = 32

        PW_U = decrypted[:PW_U_len]
        ID_EV_bytes = decrypted[PW_U_len:PW_U_len+ID_len]
        a_bytes = decrypted[PW_U_len+ID_len:]

        ID_EV = ID_EV_bytes.decode().strip('\x00')
        a = int.from_bytes(a_bytes, 'big')

        PW_EV = self.registered_users.get(ID_EV, {}).get("PW_EV")
        if PW_EV is None:
            print(f"[SP] EV {ID_EV} not registered yet. Registration aborted.")
            return None

        A_U = H(PW_U + ID_EV_bytes + PW_EV + a_bytes)

        TR3 = int(time.time())

        key_material2 = PK_U_bytes + PW_U + TR3.to_bytes(8, 'big')
        session_key2 = H(key_material2)

        payload = A_U + TR3.to_bytes(8, 'big')
        ER2 = aes_gcm_encrypt(session_key2, payload)

        msg_RU2 = {
            "ER2": ER2,
            "TR3": TR3,
        }

        print(f"[SP] Registration message M_RU2 prepared for {ID_EV}.")

        return msg_RU2

    def store_EV_password(self, ID_EV, PW_EV):
        self.registered_users[ID_EV] = {
            "PW_EV": PW_EV
        }

    def get_public_parameters(self):
        return {
            "curve": self.curve.name,
            "p": self.curve.field.p,
            "q": self.q,
            "generator_g": (self.g.x, self.g.y),
            "PK_SP": (self.PK_SP.x, self.PK_SP.y),
            "PK_EV_shared": (self.PK_EV_shared.x, self.PK_EV_shared.y),
            "hash_function": "SHA-256"
        }

    def register_CS(self, ID_CS):
        CCS = secrets.token_hex(8)
        CCS_bytes = CCS.encode()
        ID_CS_bytes = ID_CS.encode()
        y_bytes = self.y.to_bytes(32, 'big')

        beta = H(CCS_bytes + ID_CS_bytes + y_bytes)

        self.registered_CS[ID_CS] = {
            "CCS": CCS,
            "beta": beta
        }

        print(f"[SP] Registered CS {ID_CS} with CCS={CCS} and β={beta.hex()}")

        return {
            "CCS": CCS,
            "beta": beta
        }

    def process_M1_and_generate_M2(self, M1):
        E1 = M1["E1"]
        TU1 = M1["TU1"]
        PK_EV_bytes = M1["PK_EV_bytes"]

        deltaT = 60
        TS1 = int(time.time())
        if abs(TS1 - TU1) > deltaT:
            print("[SP] M1 timestamp expired.")
            return None

        IDEV = list(self.registered_users.keys())[0]
        PK_EV_x = int.from_bytes(PK_EV_bytes[:32], 'big')
        PK_EV_y = int.from_bytes(PK_EV_bytes[32:], 'big')
        PK_EV_point = Point(self.curve, PK_EV_x, PK_EV_y)

        x = M1["x"]
        K1 = H(IDEV.encode() + PK_EV_bytes + x.to_bytes(32, 'big'))

        plaintext = aes_gcm_decrypt(K1, E1)

        H1 = plaintext[:32]
        x_value = int.from_bytes(plaintext[32:64], 'big')
        TU1_recv = int.from_bytes(plaintext[64:72], 'big')
        IDU1 = plaintext[72:]

        PKSP_bytes = (
            self.PK_SP.x.to_bytes(32, 'big') +
            self.PK_SP.y.to_bytes(32, 'big')
        )

        Hval = H(PK_EV_bytes + PKSP_bytes + x_value.to_bytes(32, 'big'))
        IDU_star = xor_bytes(IDU1, Hval)

        xg = x_value * self.g
        H_star1 = H(IDEV.encode() + xg.x.to_bytes(32, 'big') + PK_EV_bytes + TU1_recv.to_bytes(8, 'big'))

        if H_star1 != H1:
            print("[SP] H1 verification failed.")
            return None

        y = secrets.randbelow(self.q - 1) + 1
        shared_point = (x_value * y) * self.g
        shared_x_bytes = shared_point.x.to_bytes(32, 'big')

        H2 = H(PK_EV_bytes + PKSP_bytes + shared_x_bytes + TU1_recv.to_bytes(8, 'big'))

        KS2 = H(PK_EV_bytes + PKSP_bytes + y.to_bytes(32, 'big'))

        TS2 = int(time.time())

        payload = x_value.to_bytes(32, 'big') + y.to_bytes(32, 'big') + H2 + TU1_recv.to_bytes(8, 'big') + TS2.to_bytes(8, 'big') + IDU_star
        E2 = aes_gcm_encrypt(KS2, payload)

        M2 = {
            "E2": E2,
            "TS2": TS2,
            "PK_EV_bytes": PK_EV_bytes,
            "y": y,
            "x": x_value,
            "IDU_star": IDU_star,
            "H2": H2
        }

        print("[SP] M2 constructed and sent to CS.")
        return M2

    def process_M3_and_generate_M4(self, M3, context):
        E3 = M3["E3"]
        TC2 = M3["TC2"]

        deltaT = 60
        TS3 = int(time.time())
        if abs(TS3 - TC2) > deltaT:
            print("[SP] M3 timestamp expired.")
            return None

        KS2 = H(context["PK_EV_bytes"] + self.PK_SP.x.to_bytes(32, 'big') + self.PK_SP.y.to_bytes(32, 'big') + context["y"].to_bytes(32, 'big'))

        plaintext = aes_gcm_decrypt(KS2, E3)

        H3 = plaintext[:32]
        MACV = plaintext[32:64]
        z = int.from_bytes(plaintext[64:96], 'big')
        CCS = plaintext[96:112]
        TC2_recv = int.from_bytes(plaintext[112:120], 'big')

       
        if H3 != context["H2"]:
            print("[SP] H3 verification failed (does not match H2).")
            return None

        
        H_star3 = H(
            context["H2"]
            + context["ID_EV"].encode()
            + CCS
            + context["y"].to_bytes(32, 'big')
            + z.to_bytes(32, 'big')
            + TC2_recv.to_bytes(8, 'big')
        )


        xg = context["x"] * self.g
        zg = z * self.g


        H4_input = (
            context["H_star1"] +
            context["PK_EV_bytes"] +
            self.PK_SP.x.to_bytes(32, 'big') +
            self.PK_SP.y.to_bytes(32, 'big') +
            xg.x.to_bytes(32, 'big') +
            zg.x.to_bytes(32, 'big') +
            TS3.to_bytes(8, 'big') +
            context["TU1"].to_bytes(8, 'big')
        )
        
        H4 = H(H4_input)

        TS4 = int(time.time())

        payload = MACV + context["y"].to_bytes(32, 'big') + z.to_bytes(32, 'big') + b"dummy_b".ljust(8, b'\x00') + TC2_recv.to_bytes(8, 'big') + CCS + H4 + TS4.to_bytes(8, 'big')
        E4 = aes_gcm_encrypt(context["K1"], payload)

        M4 = {
            "E4": E4,
            "TS4": TS4,
            "TS3": TS3,
            "PK_SP_x": self.PK_SP.x,
            "PK_SP_y": self.PK_SP.y,
        }

        print("[SP] M4 constructed and sent to EV.")
        return M4



In [23]:
# Electric Vehicle

class ElectricVehicle:
    def __init__(self, ID_EV, PW_EV, SP: ServiceProvider):
        self.ID_EV = ID_EV
        self.PW_EV = PW_EV
        self.SP = SP
        self.curve = SP.curve
        self.q = SP.q
        self.g = SP.g

        self.a = secrets.randbelow(self.q - 1) + 1
        self.PK_U = self.a * self.g

    def register(self):
        ID_bytes = self.ID_EV.encode().ljust(32, b'\x00')
        a_bytes = self.a.to_bytes(32, 'big')

        hash_pw = H(self.PW_EV + ID_bytes + a_bytes)
        PW_U = xor_bytes(self.PW_EV, hash_pw)

        TR1 = int(time.time())
        PK_U_bytes = self.PK_U.x.to_bytes(32, 'big') + self.PK_U.y.to_bytes(32, 'big')
        session_key = H(PK_U_bytes + TR1.to_bytes(8, 'big'))

        payload = PW_U + ID_bytes + a_bytes
        ER1 = aes_gcm_encrypt(session_key, payload)

        msg_RU1 = {
            "ER1": ER1,
            "TR1": TR1,
            "PK_U_bytes": PK_U_bytes,
        }

        print(f"[EV] M_RU1 prepared and sent.")
        msg_RU2 = self.SP.register_EV(msg_RU1)
        if msg_RU2 is None:
            print("[EV] Registration failed.")
            return False

        TR3 = msg_RU2["TR3"]
        TR4 = int(time.time())
        if abs(TR4 - TR3) > 60:
            print("[EV] Timestamp TR3 expired.")
            return False

        session_key2 = H(PK_U_bytes + PW_U + TR3.to_bytes(8, 'big'))
        decrypted = aes_gcm_decrypt(session_key2, msg_RU2["ER2"])
        A_U = decrypted[:32]

        print(f"[EV] Registration successful. A_U: {A_U.hex()}")

        self.PW_U = PW_U
        self.A_U = A_U

        A_U_prime = xor_bytes(H(ID_bytes), PW_U + ID_bytes)
        self.A_U_prime = A_U_prime

        return True

    def change_password(self, PW_EV_old, PW_EV_new):
        print(f"\n[EV] Attempting password change...")

        ID_bytes = self.ID_EV.encode().ljust(32, b'\x00')
        a_bytes = self.a.to_bytes(32, 'big')

        hash_old = H(PW_EV_old + ID_bytes + a_bytes)
        PW_U_prime = xor_bytes(PW_EV_old, hash_old)

        A_U_prime_input = xor_bytes(H(ID_bytes), PW_U_prime + ID_bytes)

        if A_U_prime_input != self.A_U_prime:
            print("[EV] Password verification failed. Cannot change password.")
            return False

        print("[EV] Password verified successfully.")

        hash_new = H(PW_EV_new + ID_bytes + a_bytes)
        PW_U_new = xor_bytes(PW_EV_new, hash_new)

        h_pw_new = H(PW_EV_new + ID_bytes + a_bytes)
        A_U_new = H(H(ID_bytes) + h_pw_new)

        self.PW_EV = PW_EV_new
        self.PW_U = PW_U_new
        self.A_U_prime = A_U_new

        print("[EV] Password changed successfully.")
        print(f"  New A_U′: {A_U_new.hex()}")

        return True


    def login_and_authenticate(self, CS):
        PK_EV_bytes = (
            self.PK_U.x.to_bytes(32, 'big') +
            self.PK_U.y.to_bytes(32, 'big')
        )
    
        x = secrets.randbelow(self.q - 1) + 1
        xg = x * self.g
    
        K1 = H(self.ID_EV.encode() + PK_EV_bytes + x.to_bytes(32, 'big'))
    
        TU1 = int(time.time())
    
        H1 = H(self.ID_EV.encode() + xg.x.to_bytes(32, 'big') + PK_EV_bytes + TU1.to_bytes(8, 'big'))
    
        IDU1 = xor_bytes(
            self.ID_EV.encode().ljust(32, b'\x00'),
            H(PK_EV_bytes + x.to_bytes(32, 'big'))
        )

        plaintext = H1 + x.to_bytes(32, 'big') + TU1.to_bytes(8, 'big') + IDU1
        E1 = aes_gcm_encrypt(K1, plaintext)
    
        M1 = {
            "E1": E1,
            "TU1": TU1,
            "PK_EV_bytes": PK_EV_bytes,
            "x": x,
        }
    
        M2 = self.SP.process_M1_and_generate_M2(M1)
    
        M3 = CS.process_M2_and_generate_M3(M2)
    
        context = {
            "PK_EV_bytes": PK_EV_bytes,
            "y": M2["y"],
            "x": M2["x"],
            "ID_EV": self.ID_EV,
            "H2": M2["H2"],
            "H_star1": H1,
            "K1": K1,
            "TU1": TU1
        }
    
        M4 = self.SP.process_M3_and_generate_M4(M3, context)
    
        E4 = M4["E4"]
        TS4 = M4["TS4"]

        plaintext = aes_gcm_decrypt(K1, E4)
    
        MACV = plaintext[0:32]
        y_recv = int.from_bytes(plaintext[32:64], 'big')
        z_recv = int.from_bytes(plaintext[64:96], 'big')
        dummy_b = plaintext[96:104]
        TC2 = int.from_bytes(plaintext[104:112], 'big')
        C_star_CS = plaintext[112:128]
        H4 = plaintext[128:160]
        TS4_recv = int.from_bytes(plaintext[160:168], 'big')
    
    
        zg = z_recv * self.g
        xg = context["x"] * self.g
        TS3 = M4["TS3"]
    
        H_star4_input = (
            context["H_star1"] +
            context["PK_EV_bytes"] +
            M4["PK_SP_x"].to_bytes(32, 'big') +
            M4["PK_SP_y"].to_bytes(32, 'big') +
            xg.x.to_bytes(32, 'big') +
            zg.x.to_bytes(32, 'big') +
            TS3.to_bytes(8, 'big') +
            context["TU1"].to_bytes(8, 'big')
        )
        H_star4 = H(H_star4_input)

    
        if H_star4 != H4:
            print("[EV] H4 verification failed.")
            return None

        shared_scalar = (x * z_recv) % self.q
        PK_CS_x = (self.SP.y * self.SP.g).x
    
        # Recover CCS from C_star_CS
        CCS = xor_bytes(
            C_star_CS,
            H(
                PK_CS_x.to_bytes(32, 'big') +
                PK_EV_bytes +
                context["y"].to_bytes(32, 'big')
            )
        )
    
        # Compute IDU_star to match SP's computation
        PKSP_bytes = M4["PK_SP_x"].to_bytes(32, 'big') + M4["PK_SP_y"].to_bytes(32, 'big')
        Hval = H(PK_EV_bytes + PKSP_bytes + context["x"].to_bytes(32, 'big'))
        context["IDU_star"] = xor_bytes(IDU1, Hval)
    
    
        MACU = H(
            context["IDU_star"] +
            PK_EV_bytes +
            PK_CS_x.to_bytes(32, 'big') +
            CCS +
            xg.x.to_bytes(32, 'big') +
            zg.x.to_bytes(32, 'big') +
            TC2.to_bytes(8, 'big')
        )
    
        if MACU != MACV:
            print("[EV] MAC verification failed.")
            return None
    
        SKEV = H(
            self.ID_EV.encode() +
            context["ID_EV"].encode() +
            PK_EV_bytes +
            list(self.SP.registered_CS.keys())[0].encode() +
            shared_scalar.to_bytes(32, 'big') +
            CCS +
            MACV +
            TC2.to_bytes(8, 'big')
        )
    
        print("[EV] Authentication successful. SK_EV:", SKEV.hex())
        return SKEV


# Charging Station

class ChargingStation:
    def __init__(self, ID_CS, SP: ServiceProvider):
        self.ID_CS = ID_CS
        self.SP = SP

    def register(self):
        print(f"[CS] Sending ID_CS = {self.ID_CS} to SP.")

        response = self.SP.register_CS(self.ID_CS)
        CCS = response["CCS"]
        beta = response["beta"]

        PK_CS = self.SP.y * self.SP.g

        self.registration_data = {
            "CCS": CCS,
            "beta": beta,
            "PK_CS": PK_CS
        }

        print(f"[CS] Registration successful.")
       
        return True


    def process_M2_and_generate_M3(self, M2):
        E2 = M2["E2"]
        TS2 = M2["TS2"]
        deltaT = 60
        TC1 = int(time.time())
    
        if abs(TC1 - TS2) > deltaT:
            print("[CS] M2 timestamp expired.")
            return None
    
        y = M2["y"]
        PK_EV_bytes = M2["PK_EV_bytes"]
    
        PKSP_bytes = self.SP.PK_SP.x.to_bytes(32, 'big') + self.SP.PK_SP.y.to_bytes(32, 'big')
        KS2 = H(PK_EV_bytes + PKSP_bytes + y.to_bytes(32, 'big'))
    
        plaintext = aes_gcm_decrypt(KS2, E2)
    
        x = int.from_bytes(plaintext[0:32], 'big')
        y_val = int.from_bytes(plaintext[32:64], 'big')
        H2 = plaintext[64:96]
        TU1 = int.from_bytes(plaintext[96:104], 'big')
        TS2 = int.from_bytes(plaintext[104:112], 'big')
        IDU_star = plaintext[112:]
    
        shared_point = (x * y_val) * self.SP.g
        shared_x_bytes = shared_point.x.to_bytes(32, 'big')

        H_star2 = H(
            PK_EV_bytes
            + PKSP_bytes
            + shared_x_bytes
            + TU1.to_bytes(8, 'big')
        )
    
        if H_star2 != H2:
            print("[CS] H2 verification failed.")
            return None
    
        z = secrets.randbelow(self.SP.q - 1) + 1
        zg = z * self.SP.g
        xg = x * self.SP.g
    
        CCS_bytes = self.registration_data["CCS"].encode()
        TC2 = int(time.time())

    
        MACV = H(
            IDU_star +
            PK_EV_bytes +
            self.registration_data["PK_CS"].x.to_bytes(32, 'big') +
            CCS_bytes +
            xg.x.to_bytes(32, 'big') +
            zg.x.to_bytes(32, 'big') +
            TC2.to_bytes(8, 'big')
        )
       
    
        shared_point = (x * z) * self.SP.g
        xy_bytes = shared_point.x.to_bytes(32, 'big')
        
        SK_CS = H(
            IDU_star +
            list(self.SP.registered_users.keys())[0].encode() +
            PK_EV_bytes +
            self.registration_data["PK_CS"].x.to_bytes(32, 'big') +
            xy_bytes +
            CCS_bytes +
            MACV +
            TC2.to_bytes(8, 'big')
        )
    
        C_star_CS = xor_bytes(
            CCS_bytes,
            H(
                self.registration_data["PK_CS"].x.to_bytes(32, 'big')
                + PK_EV_bytes
                + y_val.to_bytes(32, 'big')
            )
        )
    
        plaintext = H_star2 + MACV + z.to_bytes(32, 'big') + C_star_CS + TC2.to_bytes(8, 'big')
    
        E3 = aes_gcm_encrypt(KS2, plaintext)
    
        M3 = {
            "E3": E3,
            "TC2": TC2
        }
    
        print("[CS] M3 constructed and sent to SP.")
        return M3


In [24]:
print("\n\n==== BENCHMARKING ====\n")

tracemalloc.start()
t0 = time.perf_counter()

# 1. Initialize Service Provider (SP)
SP = ServiceProvider()

# 2. Register EV with the SP
ID_EV = "EV12345"
PW_EV = secrets.token_bytes(32)   # Random 256-bit password

SP.store_EV_password(ID_EV, PW_EV)

print("\n=== SP REGISTERED EV ===")
print(f"ID_EV = {ID_EV}")
print(f"PW_EV = {PW_EV.hex()}")

# 3. Register Charging Station
ID_CS = "CS001"
CS = ChargingStation(ID_CS, SP)
CS.register()


# 4. EV runs registration protocol
EV = ElectricVehicle(ID_EV, PW_EV, SP)
registration_success = EV.register()

if not registration_success:
    print("[MAIN] EV registration failed. Exiting.")
    exit()


# 5. EV authenticates and logs in
session_key_EV = EV.login_and_authenticate(CS)
if session_key_EV:
    print("\n !! EV successfully authenticated!")
    print(f"Session Key (SK_EV): {session_key_EV.hex()}")
else:
    print("\n❌ EV authentication failed.")


PW_old = PW_EV
PW_new = secrets.token_bytes(32)

print("\n===== Testing password change =====")

result = EV.change_password(PW_old, PW_new)

if result:
    print("[System] Password change successful.")
else:
    print("[System] Password change failed.")


t1 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

print(f"\n==== BENCHMARK RESULTS ====")
print(f"Total execution time: {t1 - t0:.4f} seconds")
print(f"Peak memory usage: {peak / 1024:.2f} KB")
print(f"Current memory usage after run: {current / 1024:.2f} KB")



==== BENCHMARKING ====


=== SP REGISTERED EV ===
ID_EV = EV12345
PW_EV = 3e4fb9a684fe3e82077b921fc26e09fe6e944507edfc4a489ab7cb9489a5190e
[CS] Sending ID_CS = CS001 to SP.
[SP] Registered CS CS001 with CCS=b5f48138ac8b3921 and β=04462c3b557fd7e332d6c437a39db0feda8b01c9ac967bd2f0ccca02d53583db
[CS] Registration successful.
[EV] M_RU1 prepared and sent.
[SP] Registration message M_RU2 prepared for EV12345.
[EV] Registration successful. A_U: e7a1830e6ca86c2207e5298952237941cfd9b067b4924279d3f6e5140ccdef02
[SP] M2 constructed and sent to CS.
[CS] M3 constructed and sent to SP.
[SP] M4 constructed and sent to EV.
[EV] Authentication successful. SK_EV: ea76766066aec3a994a71b901f1cab1879c5ca920e4cc0373d6462df33e77894

 !! EV successfully authenticated!
Session Key (SK_EV): ea76766066aec3a994a71b901f1cab1879c5ca920e4cc0373d6462df33e77894

===== Testing password change =====

[EV] Attempting password change...
[EV] Password verified successfully.
[EV] Password changed successfully.
  New A_U