# IKEv2 responder test

In [None]:
!apt update && apt install -y libpcap-dev
%pip install --pre scapy[basic]
%pip install pycryptodomex

In [None]:
import socket
hostip = socket.gethostbyname(socket.gethostname())
print("IP address of host:", hostip)

In [None]:
%load_ext autoreload
%autoreload

from scapy.all import *
from scapy.contrib.ikev2 import *
from scapy.utils import inet_aton
import binascii
import os
import ikev2_lib
import socket

IKEv2 specification: https://tools.ietf.org/pdf/rfc7296.pdf

In [None]:
INITIATOR_IP = '172.17.0.3'
INITIATOR_PORT = 500
RESPONDER_IP = '172.17.0.2'
RESPONDER_PORT = 500

### Create dummy UDP listener to prevent ICMP port unreachable messages

In [None]:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((RESPONDER_IP, RESPONDER_PORT))

### Load secrets from file ###

In [None]:
SECRETS_FILE = "ipsec.psk-server.secrets"
shared_secret = b""
with open(SECRETS_FILE, 'rb') as fh:
    while True:
        line = fh.readline()
        
        if not line:
            break
        
        line = line.strip()
        if len(line) == 0:
            continue
        if line.startswith(b'#'):
            continue
        id_, secret = map(bytes.strip, line.split(b':'))
        secret_type, secret_value = map(bytes.strip, secret.split(b' '))
        if id_ == hostip.encode():
            shared_secret = secret_value
print(f"Shared secret read from file is: [{shared_secret}]")

### Initialise local parameters and secrets

In [None]:
INTEGRITY_ALGO_ID = 12 # 2 = AUTH_HMAC_SHA1_96, 12 = AUTH_HMAC_SHA2_256_128
ENCRYPTION_ALGO_ID = 12 # 12 = ENCR_AES_CBC
PRF_ALGO_ID = 5 # 5 = PRF_HMAC_SHA2_256
DH_GROUP_ID = 14 # 14 = 2048-bit MODP group

#integrity_algo = ikev2_lib.INTEGRITY[INTEGRITY_ALGO_ID]
#encrytion_algo = ikev2_lib.ENCRYPTION[ENCRYPTION_ALGO_ID]
#prf_algo = ikev2_lib.PRF[PRF_ALGO_ID]

nonce_r = os.urandom(32)
spi_r = b'Ysblokje'



### Wait for IKE_SA_INIT from initiator ###

In [None]:
capture = sniff(filter=f"dst host {RESPONDER_IP} and dst port {RESPONDER_PORT}", count=1)
capture.summary()

In [None]:
ike_sa_init_i_pkt = capture[0][IP]
ike_sa_init_i_pkt.show()

### Strip IP and UDP header ###

In [None]:
ike_sa_init_i = ike_sa_init_i_pkt[IKEv2]

### Get proposal and transforms from IKE_SA_INIT initiator ### 

In [None]:
def transform_ids_from_proposal(proposal: IKEv2_payload_Proposal) -> dict:
    transforms = proposal.trans
    transform_ids = {}
    counter = 0
    while True:
        layer = transforms.getlayer(counter)
        if layer == None:
            break
        transform_ids[ikev2_lib.TRANSFORM_TYPES[layer.transform_type]] = layer.transform_id
        #layer.show()
        counter += 1
    return transform_ids


In [None]:
# Get proposal from Initiator IKE_SA_INIT 
proposal = ike_sa_init_i[IKEv2_payload_Proposal]

transform_ids = transform_ids_from_proposal(proposal)

integrity_algo = ikev2_lib.INTEGRITY[transform_ids['INTEG']]
encrytion_algo = ikev2_lib.ENCRYPTION[transform_ids['ENCR']]
prf_algo = ikev2_lib.PRF[transform_ids['PRF']]

### Calculate CSPs ###

In [None]:
dh = ikev2_lib.DiffieHellman()

dh_a = dh.generate_public().to_bytes(length=256, byteorder='big') 
dh_b = int.from_bytes(ike_sa_init_i[IKEv2_payload_KE].load, byteorder='big')

dhs = dh.generate_shared(dh_b).to_bytes(length=256, byteorder='big')
print(f"shared Diffie Hellman secret => {len(dhs)} bytes")
hexdump(dhs)

In [None]:
nonce_i = ike_sa_init_i[IKEv2_payload_Nonce].load
spi_i = ike_sa_init_i.init_SPI

print(f"SPIi: {spi_i}")
print(f"SPIr: {spi_r}")

print(f"Ni: {binascii.b2a_hex(nonce_i).decode()}")
print(f"Nr: {binascii.b2a_hex(nonce_r).decode()}")

### Calculate SKEYSEED and derive keys ([RFC7296 2.14](https://datatracker.ietf.org/doc/html/rfc7296#section-2.14))

In [None]:
prf = ikev2_lib.get_prf(PRF_ALGO_ID)
if prf:
    skeyseed = prf(nonce_i + nonce_r, dhs)

    print(f"SKEYSEED => {len(skeyseed)} bytes")
    hexdump(skeyseed)

Keys needed: SK_d | SK_ai | SK_ar | SK_ei | SK_er | SK_pi | SK_pr

In [None]:
sk_d, sk_ai, sk_ar, sk_ei, sk_er, sk_pi, sk_pr = ikev2_lib.generate_sks(prf, skeyseed, nonce_i + nonce_r + spi_i + spi_r, 
                                                                        prf_algo["key_size"], integrity_algo["key_size"], encrytion_algo["key_size"])
print(f"Sk_d secret => {len(sk_d)} bytes")
hexdump(sk_d)
print(f"Sk_ai secret => {len(sk_ai)} bytes")
hexdump(sk_ai)
print(f"Sk_ar secret => {len(sk_ar)} bytes")
hexdump(sk_ar)
print(f"Sk_ei secret => {len(sk_ei)} bytes")
hexdump(sk_ei)
print(f"Sk_er secret => {len(sk_er)} bytes")
hexdump(sk_er)
print(f"Sk_pi secret => {len(sk_pi)} bytes")
hexdump(sk_pi)
print(f"Sk_pr secret => {len(sk_pr)} bytes")
hexdump(sk_pr)

### Build IKE_SA_INIT response

In [None]:

hdr = IKEv2(init_SPI = spi_i, resp_SPI=spi_r, next_payload = 'SA', exch_type = 'IKE_SA_INIT', flags='Response')
sa = IKEv2_payload_SA(next_payload = 'KE', prop=proposal)
ke = IKEv2_payload_KE(next_payload = 'Nonce', group = '2048MODPgr', load = dh_a)
nonce = IKEv2_payload_Nonce(next_payload = 'None', load = nonce_r)

ike_sa_init_r = hdr/sa/ke/nonce

ike_sa_init_r_pkt = IP(dst = INITIATOR_IP)/UDP(dport = INITIATOR_PORT, sport = RESPONDER_PORT)/ike_sa_init_r

ike_sa_init_r_pkt.show()


### Send IKE_SA_INIT response and receive IKE_AUTH

In [None]:
ans = sr1(ike_sa_init_r_pkt)
#ans.show()

ike_auth_i = ans[IKEv2]
ike_auth_i.show()

### Verify checksum and decrypt payload ([RFC7296 3.14](https://datatracker.ietf.org/doc/html/rfc7296#section-3.14))

In [None]:
cipher_block_size = encrytion_algo["block_size"] # AES_CBC
integrity_hash_size = integrity_algo["hash_size"] # HMAC_SHA1_96

auth_data = raw(ike_auth_i)[:-integrity_hash_size]
iv = ike_auth_i[IKEv2_payload_Encrypted].load[:cipher_block_size]
encrypted = ike_auth_i[IKEv2_payload_Encrypted].load[cipher_block_size:-integrity_hash_size]
checksum = ike_auth_i[IKEv2_payload_Encrypted].load[-integrity_hash_size:]

print(f"IV: {binascii.b2a_hex(iv).decode()}")
print(f"Checksum in packet: \t{binascii.b2a_hex(checksum).decode()}\tOK? {ikev2_lib.verify_integrity(sk_ai, auth_data, checksum, integrity_algo_id=transform_ids['INTEG'])}\n")

#TODO: stop in case of checksum failure

plain = ikev2_lib.decrypt_message(sk_ei, encrypted, iv)
ike_auth_i_payload = IKEv2_payload_IDi(plain) # Cast decrypted payload to IKEv2 payload(s), for now assumes IDi payload

In [None]:
# idi_prime is the IDi payload minus the fixed payload header (= 4 bytes)
idi_payload_len = ike_auth_i_payload[IKEv2_payload_IDi].length
idi_prime = raw(ike_auth_i_payload[IKEv2_payload_IDi])[4:idi_payload_len]

### Authentication of IKE SA ([RFC7296 2.15](https://datatracker.ietf.org/doc/html/rfc7296#section-2.15)) ###

```
InitiatorSignedOctets = RealMessage1 | NonceRData | MACedIDForI
...
MACedIDForI = prf(SK_pi, RestOfInitIDPayload)
```

In [None]:
macedIDforI = prf(sk_pi, idi_prime)
octets = raw(ike_sa_init_i) + nonce_r + macedIDforI

```
For the initiator:
      AUTH = prf( prf(Shared Secret, "Key Pad for IKEv2"),
                       <InitiatorSignedOctets>)
```

In [None]:
IKEV2_KEYPAD = b"Key Pad for IKEv2"
prf_secret_keypad = prf(shared_secret, IKEV2_KEYPAD)
auth_value = prf(prf_secret_keypad, octets)
print(f"AUTH = prf(prf(secret, keypad), octets) => {len(auth_value)} bytes")
hexdump(auth_value)
print()
if auth_value == ike_auth_i_payload[IKEv2_payload_AUTH].load:
    print("Authentication of initiator successful...")
else:
    print("Authentication of initiator failed!")
    

### Build IKE_AUTH response ###

```
                                <--  HDR, SK {IDr, [CERT,] AUTH,
                                         SAr2, TSi, TSr}

```

In [None]:
# Copy proposal and traffic selectors from initiator IKE_AUTH
proposal2 = ike_auth_i_payload[IKEv2_payload_Proposal]
sa2 = IKEv2_payload_SA(next_payload='TSi', prop=proposal2)

initiator_tsi = ike_auth_i_payload[IKEv2_payload_TSi]
tsi = IKEv2_payload_TSi(raw(initiator_tsi)[:initiator_tsi.length])

initiator_tsr = ike_auth_i_payload[IKEv2_payload_TSr]
tsr = IKEv2_payload_TSr(raw(initiator_tsr)[:initiator_tsr.length])
# Fixup copied traffic selectors
tsi.next_payload = 'TSr' 
tsr.next_payload = 'None'

#### IDr payload ###

In [None]:
idr_load = inet_aton(RESPONDER_IP)
idr = IKEv2_payload_IDr(next_payload='AUTH', IDtype="IPv4_addr", load=idr_load)
idr_prime = raw(idr)[4:]

#### AUTH payload ([RFC7296 2.15](https://datatracker.ietf.org/doc/html/rfc7296#section-2.15)) ####

```
ResponderSignedOctets = RealMessage2 | NonceIData | MACedIDForR
...
MACedIDForR = prf(SK_pr, RestOfRespIDPayload)
```

In [None]:
macedIDforR = prf(sk_pr, idr_prime)
octets = raw(ike_sa_init_r) + nonce_i + macedIDforR
auth_load = prf(prf_secret_keypad, octets)
print(f"AUTH = prf(prf(secret, keypad), octets) => {len(auth_load)} bytes")
hexdump(auth_load)
auth = IKEv2_payload_AUTH(next_payload = 'SA', auth_type = "Shared Key Message Integrity Code", load=auth_load)

#### Assemble and prepare encrypted payload ####

In [None]:
# Copy message ID from initiator IKE_AUTH
msg_id = ike_auth_i[IKEv2].id

# Assemble payload to be encrypted
sk_load = idr/auth/sa2/tsi/tsr
# Generate IV
iv = ikev2_lib.generate_iv()
# Encrypt payload
sk_load_encr = ikev2_lib.encrypt_message(sk_er, raw(sk_load), iv)
# Construct Encrypted payload
sk_load_complete = iv + sk_load_encr + bytes(integrity_hash_size)
# Assemble packet
hdr = IKEv2(init_SPI = spi_i, resp_SPI=spi_r, id=msg_id, next_payload = 'Encrypted', exch_type = 'IKE_AUTH', flags='Response')
sk = IKEv2_payload_Encrypted(next_payload = 'IDr', load=sk_load_complete)

ike_auth_r = hdr/sk
# Authenticate packet and append 'checksum'
auth_data = raw(ike_auth_r)[:-integrity_hash_size]
checksum = ikev2_lib.calculate_integrity(sk_ar, auth_data, integrity_algo_id=transform_ids['INTEG'])
sk_load_complete = iv + sk_load_encr + checksum
ike_auth_r[IKEv2_payload_Encrypted].load = sk_load_complete

ike_auth_r_pkt = IP(dst = INITIATOR_IP)/UDP(dport = INITIATOR_PORT, sport = RESPONDER_PORT)/ike_auth_r

In [None]:
ike_auth_r_pkt.show()

In [None]:
ans = sr1(ike_auth_r_pkt)
#ans.show()

ike_xx_i = ans[IKEv2]
ike_xx_i.show()