In [2]:
import struct

def countbytes(target):
    if isinstance(target, str):
        b = bytes(target,'utf-8')
    elif isinstance(target, int):
        b = target.to_bytes((target.bit_length() + 7) // 8, 'big')
    else:
        b = target
    print(f"original: {target}")
    print(f"byte value : {b}")
    print(f"size : {len(b)} bytes  ")


In [3]:
def intobyte(intvalue):
    return intvalue.to_bytes((intvalue.bit_length() + 7) // 8, 'big')

def bytetoint(bytevalue):
    return int.from_bytes(bytevalue, "big")


In [4]:
import collections
import random
import time
from Crypto.Hash import CMAC
from Crypto.Cipher import AES
import bitstring

EllipticCurve = collections.namedtuple('EllipticCurve', 'name p a b g n h')

#curve for NIST p-256
curve = EllipticCurve(
    'p256',
    # Field characteristic.
    p=0xffffffff00000001000000000000000000000000ffffffffffffffffffffffff,
    # Curve coefficients.
    a=0xffffffff00000001000000000000000000000000fffffffffffffffffffffffc,
    b=0x5ac635d8aa3a93e7b3ebbd55769886bc651d06b0cc53b0f63bce3c3e27d2604b,
    # Base point.
    g=(0x6b17d1f2e12c4247f8bce6e563a440f277037d812deb33a0f4a13945d898c296,
        0x4fe342e2fe1a7f9b8ee7eb4a7c0f9e162bce33576b315ececbb6406837bf51f5),
    # Subgroup order.
    n=0xffffffff00000000ffffffffffffffffbce6faada7179e84f3b9cac2fc632551,
    # Subgroup cofactor.
    h=1,
)

# Modular arithmetic ##########################################################
def inverse_mod(k, p):
    """Returns the inverse of k modulo p.
    This function returns the only integer x such that (x * k) % p == 1.
    k must be non-zero and p must be a prime.
    """
    if k == 0:
        raise ZeroDivisionError('division by zero')

    if k < 0:
        # k ** -1 = p - (-k) ** -1  (mod p)
        return p - inverse_mod(-k, p)

    # Extended Euclidean algorithm.
    s, old_s = 0, 1
    t, old_t = 1, 0
    r, old_r = p, k

    while r != 0:
        quotient = old_r // r
        old_r, r = r, old_r - quotient * r
        old_s, s = s, old_s - quotient * s
        old_t, t = t, old_t - quotient * t

    gcd, x, y = old_r, old_s, old_t

    assert gcd == 1
    assert (k * x) % p == 1

    return x % p


# Functions that work on curve points #########################################

def is_on_curve(point):
    """Returns True if the given point lies on the elliptic curve."""
    if point is None:
        # None represents the point at infinity.
        return True

    x, y = point

    return (y * y - x * x * x - curve.a * x - curve.b) % curve.p == 0


def point_add(point1, point2):
    """Returns the result of point1 + point2 according to the group law."""
    assert is_on_curve(point1)
    assert is_on_curve(point2)

    if point1 is None:
        # 0 + point2 = point2
        return point2
    if point2 is None:
        # point1 + 0 = point1
        return point1

    x1, y1 = point1
    x2, y2 = point2

    if x1 == x2 and y1 != y2:
        # point1 + (-point1) = 0
        return None

    if x1 == x2:
        # This is the case point1 == point2.
        m = (3 * x1 * x1 + curve.a) * inverse_mod(2 * y1, curve.p)
    else:
        # This is the case point1 != point2.
        m = (y1 - y2) * inverse_mod(x1 - x2, curve.p)

    x3 = m * m - x1 - x2
    y3 = y1 + m * (x3 - x1)
    result = (x3 % curve.p, -y3 % curve.p)

    assert is_on_curve(result)

    return result


def scalar_mult(k, point):
    """Returns k * point computed using the double and point_add algorithm."""
    assert is_on_curve(point)

    if k % curve.n == 0 or point is None:
        return None

    if k < 0:
        # k 가 음수일 경우 절대값으로 양수로 만듦 (현재 설정에선 음수가 나올 일은 없음)
        # k * point = -k * (-point)
        return scalar_mult(-k, point_neg(point))

    result = None
    addend = point

    while k:
        if k & 1:
            # Add.
            result = point_add(result, addend)

        # Double.
        addend = point_add(addend, addend)

        k >>= 1

    assert is_on_curve(result)

    return result

# Keypair generation and ECDSA ################################################
def make_keypair():
    """Generates a random private-public key pair."""
    private_key = random.randrange(1, curve.n)
    public_key = scalar_mult(private_key, curve.g)

    return private_key, public_key


In [5]:
# Challenge process
from Crypto import Random
from Crypto.Cipher import AES
#Generate Nonce 
import binascii
import os 
import struct 
import datetime 

BLOCK_SIZE = 16
def pad(s):
    s = s + (BLOCK_SIZE - len(s) % BLOCK_SIZE) * chr(BLOCK_SIZE - len(s) % BLOCK_SIZE).encode()
    # print(f"padding : {countbytes(s)}")
    return s

def unpad(encrypted_string):
    return encrypted_string[0:-(encrypted_string[-1])]


class AESCipher(object):
    def __init__(self, key):
        self.key = key.encode()

    def encrypt(self, raw):
        raw = pad(raw)
        iv = Random.new().read(AES.block_size)
        # print(f"iv : {countbytes(iv)}")
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        # print(f"cipher len : {len(cipher.encrypt(raw))}")
        return iv + cipher.encrypt(raw)
    
    def decrypt(self, enc):
        iv = enc[:BLOCK_SIZE] 
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return unpad(cipher.decrypt(enc[BLOCK_SIZE:]))

def AES_key_padding(z):
    if len(z) > 32 : 
        z = z[:32]
    else:
        for i in range(32-len(z)):
            z = z+"0"
    return z 

# Generate Keys

In [6]:
# 2.1. Gen. Mobile ECDH key pair
phoneSecretKey, phonePublicKey = make_keypair() # phone side 

# 2.2. Gen. Vehicle ECDH key pair
carSecretKey, carPublicKey = make_keypair() # car side 

In [7]:
# 2.3 Send mobile pub key
# 2.4. Send vehicle pub key 

## exchange public keys
    #phone sends public key to car
    #car sends public key to phone

In [8]:
# 2.6. Gen Mobile tk, vk 
phoneTemporaryKey= scalar_mult(phoneSecretKey, carPublicKey)[0] # phone side 
phoneVerificationKey= scalar_mult(phoneSecretKey, carPublicKey)[1] # phone side 

# 2.7. Gen Vehicle tk, vk
carTemporaryKey = scalar_mult(carSecretKey, phonePublicKey)[0] # car side
carVerificationKey = scalar_mult(carSecretKey, phonePublicKey)[1] # car side

### Car --> Phone Challenge

In [9]:
# 3.1 Get vehicle sk from secure storage 
with open("./car_shared_key", "rb") as f:
    carSharedKey = bytetoint(f.read())

# 3.2 Gen. vehicle challenge = nonce_c(4Byte), Timestamp_c (4Byte)
nonce_c = int(binascii.hexlify(os.urandom(4)),16).to_bytes(4,"big")# 4byte random int
ts = struct.pack(">i",int(datetime.datetime.timestamp(datetime.datetime.now())))
challenge = nonce_c+ts


# 3.3 enc_tk(enc_sk(challenge))
AES_key1 = AES_key_padding(str(carSharedKey)) # 1 encryption with shared key 
AES_key2 = AES_key_padding(str(carTemporaryKey)) # 2 encryption with Temporary key 

Car_cipher1 = AESCipher(key=AES_key1) # generate encryption instance for 1st encryption with shared key
Car_cipher2 = AESCipher(key=AES_key2)  # generate encryption instance for 2nd encryption  with Temporary key

plain_challenge_from_car = challenge
encrypted_challenge_from_car1 = Car_cipher1.encrypt(plain_challenge_from_car) # 1st encryption with shared key
encrypted_challenge_from_car2 = Car_cipher2.encrypt(encrypted_challenge_from_car1) # 2nd encryption with Temporary  key

countbytes(encrypted_challenge_from_car2)

# 3.4 Send enc_tk(enc_sk(challenge)) to mobile (64 bytes)

original: b"\x10M'_Y\xc3\x87\xe1+\x85\x98\x1e\x88+Wc\x16\x07F\xf4\xfe\x8cA\xe1 \xecv\xccs8Bs\xc0\xf2\xd1\x98\xa5\xe2-\xb1\xdf\x83\x1a$\n,::\x0e\x08f0Z\xf5\xab\x82\xd4\x14\xc0\xc4\xfd\x10\x1bc"
byte value : b"\x10M'_Y\xc3\x87\xe1+\x85\x98\x1e\x88+Wc\x16\x07F\xf4\xfe\x8cA\xe1 \xecv\xccs8Bs\xc0\xf2\xd1\x98\xa5\xe2-\xb1\xdf\x83\x1a$\n,::\x0e\x08f0Z\xf5\xab\x82\xd4\x14\xc0\xc4\xfd\x10\x1bc"
size : 64 bytes  


## car --> phone Challenge solve &  Phone --> Car Challenge

In [10]:
#3.5 get mobile sk from secure storage
with open("./phone_shared_key", "rb") as f:
    phoneSharedKey = bytetoint(f.read())

# dec_sk(dec_tk(enc_tk)) = challenge(nonce_c, timestamp_c)
AES_key1 = AES_key_padding(str(phoneSharedKey)) # 1 encryption with shared key 
AES_key2 = AES_key_padding(str(phoneTemporaryKey)) # 2 encryption with Temporary key 

Phone_cipher1 = AESCipher(key=AES_key1)  # generate encryption instance for 1st encryption with shared key
Phone_cipher2 = AESCipher(key=AES_key2)  # generate encryption instance for 2nd encryption  with Temporary key

decrypted_challenge_from_car1 = Phone_cipher2.decrypt(encrypted_challenge_from_car2) # 1 decryption with Temporary key
decrypted_challenge_from_car = Phone_cipher1.decrypt(decrypted_challenge_from_car1) # 2 decryption with shared key

# 3.7 Gen. response = (nonce_c,timestamp_c, username)
username="AUTOCRYPT"
padded_username = int(username.encode().hex(),16).to_bytes(64,"big")
plain_response_from_phone = decrypted_challenge_from_car+padded_username

# 3.8 enc_tk(enc_sk(response))
encrypted_response_from_phone1 = Phone_cipher1.encrypt(plain_response_from_phone) # 1 encryption with shared key
encrypted_response_from_phone2 = Phone_cipher2.encrypt(encrypted_response_from_phone1) # 2 encryption with Temporary key
countbytes(encrypted_response_from_phone2)

# 3.9 Send enc_tk(enc_sk(response)) to vehicle (128 bytes)

original: b'\xda\xda\x88`\xf5\x08x\xc7-\xe3Q\x1e\x12\xca\xa6uRXh\nc\x87\xf4\xc1511\xef\xa8\x19v\xf5\xbc]1u4k\x89\x81\xc1\xf5c\x12:K,\x12\xb7\x18\x15\xd7\xf9\x0e\xc1p_\xd3\x91\x9a\xc1f\xe0\xf5\xa3N\x91\x97\xed\xce\xbaO\xea.\x86\x9e1RU\xb7\xe4\x00\xadx0\xff~\xcc\xa3\x83\x16\x8e\xa9\t\xa5X\xf1s\xca\xb5\x9fq\xbe\xe6/\xce\x03q\x91\xd8}\x01\xdb\x97\xd6\xb2(o\xee\xfe]\x1a*\xd9\x06\xef\xfe\xaf'
byte value : b'\xda\xda\x88`\xf5\x08x\xc7-\xe3Q\x1e\x12\xca\xa6uRXh\nc\x87\xf4\xc1511\xef\xa8\x19v\xf5\xbc]1u4k\x89\x81\xc1\xf5c\x12:K,\x12\xb7\x18\x15\xd7\xf9\x0e\xc1p_\xd3\x91\x9a\xc1f\xe0\xf5\xa3N\x91\x97\xed\xce\xbaO\xea.\x86\x9e1RU\xb7\xe4\x00\xadx0\xff~\xcc\xa3\x83\x16\x8e\xa9\t\xa5X\xf1s\xca\xb5\x9fq\xbe\xe6/\xce\x03q\x91\xd8}\x01\xdb\x97\xd6\xb2(o\xee\xfe]\x1a*\xd9\x06\xef\xfe\xaf'
size : 128 bytes  


In [11]:
countbytes(decrypted_challenge_from_car)

original: b'\xaa1_\x0bct\x9bT'
byte value : b'\xaa1_\x0bct\x9bT'
size : 8 bytes  


### Phone --> Car Challenge solve

In [12]:
#3.10 dec_sk(dec_tk(enc_tk)) = nonce_c,timestamp_c, username
with open("./car_shared_key", "rb") as f:
    carSharedKey = bytetoint(f.read())

AES_key1 = AES_key_padding(str(carSharedKey)) # 1 encryption with shared key 
AES_key2 = AES_key_padding(str(carTemporaryKey)) # 2 encryption with Temporary key 

Car_cipher1 = AESCipher(key=AES_key1) # generate encryption instance for 1st encryption with shared key 
Car_cipher2 = AESCipher(key=AES_key2) # generate encryption instance for 2nd encryption  with Temporary key

decrypted_response_from_phone1 = Car_cipher2.decrypt(encrypted_response_from_phone2)  # 1 decryption with Temporary key
decrypted_response_from_phone = Car_cipher1.decrypt(decrypted_response_from_phone1) # 2 decryption with shared key

responsed_nonce_c = decrypted_response_from_phone[0:4] #extract nonce 
responsed_timestamp_c = decrypted_response_from_phone[4:8] # extract timestamp
responsed_username = decrypted_response_from_phone[8:] # extract username
unpad_responsed_username = responsed_username.decode().lstrip("\x00") # remove padding

#3.11 Verify response timestmpa_p < timestamp.now() + timedelta(minutes = 5)       nonce_c == Nonce_c

nowtime = int(datetime.datetime.timestamp(datetime.datetime.now()))
challenge_time = int(responsed_timestamp_c.hex(),16)
elapsed_minute = (nowtime-challenge_time)/60 

if responsed_nonce_c == nonce_c:
    print("nonce is valid")
    if elapsed_minute < 5 : # 5 minutes
        print("timestamp is valid")
    else:
        print("timeout")
else:
    print("nonce invalid")

nonce is valid
timestamp is valid


In [13]:
countbytes(nonce_c)

original: b'\xaa1_\x0b'
byte value : b'\xaa1_\x0b'
size : 4 bytes  


### Vehicle Info --> Phone 

In [14]:
# 4.1. Gen. vehicle info  RPS_DATA(CMD, MODE, Clearance, ETC) = 32 byte
# 通信仕様_RPSSmartphone.xlsx 
# on vehicle side
import bitstring
CMD = bitstring.BitArray(hex="0x11") #1byte 
MODE = bitstring.BitArray(hex="0x00") #1byte 
ENG_Status_Reserved = bitstring.BitArray(bin="0b0")  #1bit00
Door_Lock_Status_Reserved = bitstring.BitArray(bin="0b0")  #1bit00
Advertise_Req = bitstring.BitArray(bin="0b00")  #1bit00
Longitudinal_direction = bitstring.BitArray(bin="0b00")  #1bit00
Lateral_direction = bitstring.BitArray(bin="0b00")  #1bit00
Progress = bitstring.BitArray(hex="0x55") #1byte 0x00~0x64 (100)
clearance1 =bitstring.BitArray(bin="0b01010000") #1byte sonar 1~4 (0,1)
clearance2 = bitstring.BitArray(bin="0b01010000") #1byte sonar 5~8 (0,1)
clearance3 = bitstring.BitArray(bin="0b01010000") #1byte sonar 9~12 (0,1) 
Error_Code = bitstring.BitArray(hex="0x00") #1byte 
working_time = bitstring.BitArray(hex="0x11") #1byte
VID = bitstring.BitArray(hex="0x00000000") #4byte
SNO = bitstring.BitArray(hex="0x00") #1byte
Reserved1 = bitstring.BitArray(bin="0b0000") #4bit
Vibration_Type_Reserve = bitstring.BitArray(bin="0b000") #3bit
Vibration_timming = bitstring.BitArray(bin="0b0") #1bit
Touch_ID = bitstring.BitArray(hex="0x00") #1byte 
Challenge  = bitstring.BitArray(hex="0x0000ffff0000ffff") #4byte 
VAC = bitstring.BitArray(hex="0x31") #1byte 
Reserved2 = bitstring.BitArray(bin="0b000000") #6bit 
CRC10 =  bitstring.BitArray(bin="0b0011110000") #10bit 

#concate all bitarray
RPS_message_example = CMD+MODE+ENG_Status_Reserved+Door_Lock_Status_Reserved+Advertise_Req+Longitudinal_direction+Lateral_direction+Progress+clearance1+clearance2+clearance3+Error_Code+working_time+VID+SNO+Reserved1+Vibration_Type_Reserve+Vibration_timming+Touch_ID+Challenge+VAC+Reserved2+CRC10
RPS_message_example = RPS_message_example.hex.encode()

#4.2. Create MAC =  CMAC(RPS_DATA, tk)
#Generate CMAC ofr RPS message to verify its integrity  
cmacKey = int(carSharedKey).to_bytes(32, byteorder='big') # make shared key to 32byte array to make CMAC
RPS_message_cmac = CMAC.new(cmacKey, ciphermod=AES)
RPS_message_cmac.update(RPS_message_example)

# 4.3 enc_tk(enc_sk(RPS_data))+CMAC
AES_key1 = AES_key_padding(str(carSharedKey)) #1st encryption with Shared password string
AES_key2 = AES_key_padding(str(carTemporaryKey)) #2nd encryption with temporary key string

Car_cipher1 = AESCipher(key=AES_key1) # generate encryption instance for 1st encryption with Temporary key 
Car_cipher2 = AESCipher(key=AES_key2) # generate encryption instance for 2nd encryption  with shared key
encrypted_vehicle_info1 = Car_cipher1.encrypt(RPS_message_example)  # 1st encryption with Temporary key
encrypted_vehicle_info2 = Car_cipher2.encrypt(encrypted_vehicle_info1) # 2nd encryption with shared key

# add CMAC to encrypted message
vehicle_info_message_with_cmac = encrypted_vehicle_info2+RPS_message_cmac.digest() # 112byte + 16byte  = 128 Byte

#4.4 send message to Phone from Car

In [15]:
#4.5 Verify MAC , dec_sk(dec_tk(enc_tk(enc_sk(RPS_data))) ,  compare(Create MAC == received MAC)
# on phone side 
RPS_message_from_car = vehicle_info_message_with_cmac[:112]
RPS_message_cmac_from_car = vehicle_info_message_with_cmac[112:]

AES_key1 = AES_key_padding(str(phoneSharedKey)) #1 encryption with shared key 
AES_key2 = AES_key_padding(str(phoneTemporaryKey)) #2 encryption with temporary key string

Phone_cipher1 = AESCipher(key=AES_key1)  # generate encryption instance for 1st encryption with shared key
Phone_cipher2 = AESCipher(key=AES_key2)  # generate encryption instance for 2nd encryption  with Temporary key


decrypted_vehicle_info1 = Phone_cipher2.decrypt(RPS_message_from_car) # 1 decryption with Temporary key
decrypted_vehicle_info = Phone_cipher1.decrypt(decrypted_vehicle_info1) # 2 decryption with shared key

#check if CMAC is correct 
cmacKey = int(phoneSharedKey).to_bytes(32, byteorder='big') # make shared key to 32byte array to make CMAC
phone_cmac_confirm = CMAC.new(cmacKey, ciphermod=AES)
phone_cmac_confirm.update(decrypted_vehicle_info)

if phone_cmac_confirm.digest() == RPS_message_cmac_from_car:
    print("CMAC is correct")
else:
    print("CMAC is not correct")

#4.6 display Vehilce Info.
print(f"decrypted_vehicle_info : {decrypted_vehicle_info.decode()}")

CMAC is correct
decrypted_vehicle_info : 110000555050500011000000000000000000ffff0000ffff3100f0


In [16]:
#4.7  generate User command (PHONE_DATA)
# on phone side

with open("./phone_shared_key", "rb") as f:
    phoneSharedKey = bytetoint(f.read())

# get user input from smart phone app and generate user_command 
user_command = bitstring.BitArray(os.urandom(32)) #32byte random bitarray

# 4.8 Create MAC CMAC(PHONE_DATA, tk)
cmacKey = int(phoneSharedKey).to_bytes(32, byteorder='big') # make shared key to 32byte array to make CMAC
user_command_cmac = CMAC.new(cmacKey, ciphermod=AES)
user_command_cmac.update(user_command.hex.encode())

# 4.9 Gen. encrypted data = enc_tk(enc_sk(PHONE_DATA))+CMAC
AES_key1 = AES_key_padding(str(phoneSharedKey)) #1 encryption with shared key 
AES_key2 = AES_key_padding(str(phoneTemporaryKey)) #2 encryption with temporary key string

Phone_cipher1 = AESCipher(key=AES_key1)  # generate encryption instance for 1st encryption with shared key
Phone_cipher2 = AESCipher(key=AES_key2)  # generate encryption instance for 2nd encryption  with Temporary key

encrypted_user_command1 = Phone_cipher1.encrypt(user_command.hex.encode()) # 1 encryption with shared key
encrypted_user_command2 = Phone_cipher2.encrypt(encrypted_user_command1) # 2 encryption with Temporary key

#add CMAC to encrypted message (128byte + 16 byte = 144 byte)
phone_user_command_message  = encrypted_user_command2 + user_command_cmac.digest()

#4.10 send message to Car from Phone


In [17]:
# 4.11 dec_tk(dec_sk(enc_tk(enc_sk(PHONE_DATA)))) = PHONE_DATA
# on car side 
with open("./car_shared_key", "rb") as f:
    carSharedKey = bytetoint(f.read())
    
recv_user_command_from_phone = phone_user_command_message

# seperate encrypted usercommand and CMAC
encrypted_user_command = recv_user_command_from_phone[:128]
user_command_cmac = recv_user_command_from_phone[128:]

AES_key1 = AES_key_padding(str(carSharedKey)) #1st encryption with Shared key string
AES_key2 = AES_key_padding(str(carTemporaryKey)) #2nd encryption with temporary key string

Car_cipher1 = AESCipher(key=AES_key1) # generate encryption instance for 1st encryption with shared key
Car_cipher2 = AESCipher(key=AES_key2)  # generate encryption instance for 2nd encryption  with Temporary key

decrypted_user_command1 = Car_cipher2.decrypt(encrypted_user_command) # 1 decryption with Temporary key
decrypted_user_command = Car_cipher1.decrypt(decrypted_user_command1) # 2 decryption with Shared key


#check CMAC
# 4.12. Verify MAC compare(Gen, CMAC(PHONE_DATA, sk) == received MAC)
cmacKey = int(carSharedKey).to_bytes(32, byteorder='big') # make shared key to 32byte array to make CMAC
user_command_check_cmac = CMAC.new(cmacKey, ciphermod=AES)
user_command_check_cmac.update(decrypted_user_command)
if user_command_check_cmac.digest() == user_command_cmac:
    print("CMAC is correct")
else:
    print("CMAC is not correct")

# 4.13. Execute CMD (on car side)

CMAC is correct


In [19]:
countbytes(user_command_check_cmac.digest() )

original: b'\xfd5\xd2\xeeE\x10\x9c?-\x9fT\x88\x96X\r\x14'
byte value : b'\xfd5\xd2\xeeE\x10\x9c?-\x9fT\x88\x96X\r\x14'
size : 16 bytes  
