# EXERCISE 6

In [678]:
import re

from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESGCM
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap

Validation code

In [679]:
def validate_solution(solution, start, end, expected_length=None):
    match = re.match(fr'^{start}.*{end}$', solution)
    print('Solution:', solution)
    if expected_length and (len(solution) // 2) != expected_length:
         print('\033[0;31mIncorrect Solution.')
    elif match:
        print('\033[92mCorrect Solution.')
    else:
        print('\033[0;31mIncorrect Solution.')
    print('\033[0m')

def check_equals(expected, solution):
    print('Expected:', expected)
    print('Solution:', solution)
    if expected == solution:
        print('\033[92mCorrect Solution.')
    else:
        print('\033[0;31mIncorrect Solution.')
    print('\033[0m')


Definition of the tshark parser utilities

In [680]:
from collections import namedtuple
from dataclasses import dataclass

# Definition of handshake representation
Handshake_Block = namedtuple('HandShake_Block', ('a_mac', 's_mac' ,'nonce', 'mic', 'mess', 'data', 'bytes'))

# definition of block representation
Block = namedtuple('Block', ('frame_control', 'add_1', 'add_2', 'add_3', 'qs_control', 'ccmp_par', 'ccmp_key_index', 'data' ,'bytes'))

TLS_HANDSHAKE_MAP = {
    1 : 'client_hello',
    2 : 'server_hello',
    11 : 'server_cert',
    14 : 'server_hello_done',
    16 : 'client_key_exchange'
}

@dataclass
class TLS_HANDSHAKE:
    client_hello: bytes
    server_hello: bytes
    server_cert: bytes
    server_hello_done: bytes
    client_key_exchange: bytes

parser aux function

In [681]:
def parse_block_line(line): 
    parsed_line = ''.join(line.split()[1:])
    # check if the line is a tshark anotation
    if re.match(r'.+:', parsed_line):
        return ''
    return parsed_line

def get_epol_type(block):
    type = block[59]
    return type

def parse_handshake_block(block):
    a_mac = block[30:36]
    s_mac = block[36:42]
    nonce = block[75:107]
    mic = block[139:155]
    mess = block[58:]
    data = None if len(block) < 160 else block[157:]
    return Handshake_Block(a_mac, s_mac, nonce, mic, mess, data, block)

def parse_block(block):
    frame_control = block[0:2]
    add_1 = block[4:10]
    add_2 = block[10:16]
    add_3 = block[16:22]
    ccmp_par = (block[24:27] + chr(0x00).encode() + block[28:32])[::-1]
    key_index = 0 if block[27] == 32 else 1
    data = block[32:]
    return Block(frame_control, add_1, add_2, add_3, None, ccmp_par, key_index, data, block)

def parse_block_qos(block):
    frame_control = block[0:2]
    add_1 = block[4:10]
    add_2 = block[10:16]
    add_3 = block[16:22]
    qs_control = block[24:26]
    ccmp_par = (block[26:29] + chr(0x00).encode() + block[30:34])[::-1]
    key_index = 0 if block[29] == 32 else 1
    data = block[34:]
    return Block(frame_control, add_1, add_2, add_3, qs_control, ccmp_par, key_index, data, block)

# TLS PART
def tls_sub_blocks(block):
    sub_blocks = []
    # check if has lenght filed (most significative bit of the EAP-TLS flags field)
    has_lenght = (block[67] >> 7) == 1
    # check if is fragmented
    if has_lenght:
        length = int.from_bytes(block[68 : 72])
        index = 72 if len(block[72:]) <= length else len(block) - length
    else :
        index = 68
    # Find the handshake interest sub-blocks
    while index < len(block):
        content_type = block[index]
        length = int.from_bytes(block[index + 3 : index + 5])
        if content_type == 22 or content_type==23:
            hs_type = block[index + 5]
            #sub_block = block[index + 5: index + 5 + length + 1]
            sub_block = block[index + 5: index + 5 + length]
            sub_blocks.append((hs_type, sub_block))
        index = index + 5 + length
    return sub_blocks
        
def find_tls_handshake_blocks(blocks):
    i = 1
    packets = []
    for block in blocks:
        # check if there is a Trasnpor security layer in the block
        packets += tls_sub_blocks(block) 
        i+=1
    # extract tls handshake interest blocks
    tls_hs_packets = {}
    for packet in packets:
        hs_type, block = packet
        if hs_type in TLS_HANDSHAKE_MAP:
            hs_type_key = TLS_HANDSHAKE_MAP[hs_type]
            tls_hs_packets[hs_type_key] = block
    tls_handshake = TLS_HANDSHAKE(**tls_hs_packets)
    return tls_handshake

In [682]:
import re

class TsharkParser:

    @staticmethod
    def parse_blocks(blocks):
        parsed_blocks = []
        b_n = 1
        for block in blocks:
            block_lines = ''.join([parse_block_line(line) for line in block.split('\n')])
            parsed_blocks.append(bytes.fromhex(block_lines))
        return parsed_blocks
            
    @staticmethod
    def parse_output(output):
        output = output.decode()
        packets_blocks = re.split(r"(?:\r?\n){2,}" ,output.strip())
        return TsharkParser.parse_blocks(packets_blocks)
    
    @staticmethod
    def parse_tls_handshake(blocks):
        # find the tls_handshake blocks
        tls_hs_blocks = find_tls_handshake_blocks(blocks)
        # filter by the interested fields
        return tls_hs_blocks
    
    @staticmethod
    def parse_tls_block(block):
        return tls_sub_blocks(block)


In [683]:
import subprocess

class Tshark:
    def __init__(self, capture_path) -> None:
        self.capture_path = capture_path
    
    def apply_filter(self, filter):
        command = f'tshark -r {self.capture_path} -Y {filter} -x --hexdump noascii'
        output = subprocess.run(command.split(), stdout=subprocess.PIPE).stdout
        return TsharkParser.parse_output(output)
    
    def find_4_handshake_sample(self):
        blocks = self.apply_filter('eapol')
        # find the blocks with eaplo type key
        blocks = list(filter(lambda b: get_epol_type(b) == 3, blocks))
        handshake_blocks = [parse_handshake_block(block) for block in blocks]
        return handshake_blocks
    
    def find_package_by_frame_number(self, frame_number):
        block = self.apply_filter(f'frame.number=={frame_number}')[0]
        # find the 802.11 package
        header_length =  int.from_bytes(block[2:4], 'little')
        block_802_11 = block[header_length:]
        # check if has QOS
        subtype = block_802_11[:1]
        subtype = (int.from_bytes(subtype, 'little') >> 4) & 8
        if subtype == 8: #  suptype == 8 the package contains Q0S field
            return parse_block_qos(block_802_11)
        return parse_block(block_802_11)
    
    def find_tls_handshake_sample(self):
        blocks = self.apply_filter('tls')
        return TsharkParser.parse_tls_handshake(blocks)
    
    def find_eap_sample(self):
        blocks = self.apply_filter('tls.record.content_type==23')
        return TsharkParser.parse_tls_block(blocks[0])[0][1]

Cryptography Aux functions

In [684]:
def HMAC(key, algo, input):
    h = hmac.HMAC(key, algo)
    h.update(input)
    return h.finalize()

Definition of the proccess constants 

In [685]:
SSID = 'TIC Project'
SERVER_PRIVATE_KEY_FILE = '../captures/server.p12'
PASSWORD = 'whatever'

# Question 1

In [686]:
# init Tshark utility
tshark = Tshark('../captures/tradio_pap_wpa_enterprise.pcapng')

Extract TLS handshake sample

In [687]:
tls_handshake_sample = tshark.find_tls_handshake_sample()

Decipher premaster key aux function definition

In [688]:
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from cryptography.hazmat.primitives.asymmetric import padding

def decipher_premaster_key(encrypted_pmk, certificate_path, password):
    with open(certificate_path, "rb") as key_file:
        # Read the PKCS certificate to get the private key
        private_key, _, _ = load_key_and_certificates(
            key_file.read(),
            password=password,
        )
    # premaster key decryption
    plaintext = private_key.decrypt(
        encrypted_pmk,
        padding.PKCS1v15()
    )
    return plaintext

In [689]:
client_key_exchange = tls_handshake_sample.client_key_exchange
# get encripted_pmk from the client_key_exchange block
encripted_pmk = client_key_exchange[6:]
# derive PMK
PMK = decipher_premaster_key(encripted_pmk, SERVER_PRIVATE_KEY_FILE, PASSWORD.encode())
validate_solution(PMK.hex(), start='0303', end='fd89b0ab', expected_length=48)

Solution: 0303a76728c2620c3081be7604a55b6516ea7306c06ab58cca75be0b5100af1b8b83ecbb8da8f34f17dde6dcfd89b0ab
[92mCorrect Solution.
[0m


# Question 2

Pseudo Random Function Definition

In [690]:
def PRF(key, label, seed, algo, n):
    seed = label + seed
    A_0 = seed
    B_0 = bytes()
    A = [A_0]
    for i in range(1, n + 1):
        A += [HMAC(key, algo=algo, input=A[i-1])]
    B = b''
    for i in range(n):
        B += HMAC(key, algo=algo, input=A[i+1] + seed)
    return B

Generate seed aux function definition

In [691]:
def SHA384(data):
    hs = hashes.Hash(hashes.SHA384())
    hs.update(data)
    return hs.finalize()

Get Master Key

In [692]:
client_hello = tls_handshake_sample.client_hello
server_hello = tls_handshake_sample.server_hello
server_cert = tls_handshake_sample.server_cert
server_hello_done = tls_handshake_sample.server_hello_done
client_key_exchange = tls_handshake_sample.client_key_exchange

# generate seed
s = SHA384(client_hello + server_hello + server_cert + server_hello_done + client_key_exchange)
MASTER_KEY = PRF(PMK, "extended master secret".encode(), seed=s, algo=hashes.SHA384(), n=1)[0:48]

Validate solution

In [693]:
validate_solution(MASTER_KEY.hex(), 'a5a6', 'b4a6')

Solution: a5a68cb7782b1bfe075cf9bd60fb8dff7dde1a746f5b22b5d9922070cb166f7bc5097a5dcb3d34e03098c8ec4bbdb4a6
[92mCorrect Solution.
[0m


# Question 3

Calculate Key block

In [694]:
server_hello_random = server_hello[6:38]
client_hello_random = client_hello[6:38]

s =  server_hello_random + client_hello_random
KEY_BLOCK = PRF(MASTER_KEY, label='key expansion'.encode(), seed=s, algo=hashes.SHA384(), n=2)[0:72]

Extract keys from the Key Block

In [695]:
CLIENT_WRITE_KEY = KEY_BLOCK[0:32]
SERVER_WRITE_KEY = KEY_BLOCK[32:64]
CLIENT_WRITE_IV = KEY_BLOCK[64:68]
SERVER_WRITE_IV = KEY_BLOCK[68:72]

Validate solution

In [696]:
validate_solution(CLIENT_WRITE_KEY.hex(), '16de', 'dda6')

Solution: 16de7e1e8d4375e5cede2fd2171793ffc69cef11332cf5fcb8e3f8803b30dda6
[92mCorrect Solution.
[0m


# Question 4

Find EAP data

In [697]:
data = tshark.find_eap_sample()

In [698]:
def AESCGM_decrypt(key, in_data, nonce, associated_data):
    aesgcm = AESGCM(key)
    return aesgcm.decrypt(nonce, in_data, associated_data)

In [699]:
tag = data[-16:]
client_nonce = CLIENT_WRITE_IV + data[0:8]

seq_num = bytes.fromhex('0000000000000001')
content_type = bytes.fromhex('17')
version = bytes.fromhex('0303')
ciphertext_length = int.to_bytes(len(data) - 24, length=2)
ADD = seq_num + content_type + version + ciphertext_length

c = AESCGM_decrypt(in_data=data[8:], nonce=client_nonce, key=CLIENT_WRITE_KEY, associated_data=ADD)


Using the wireshark, we can see that descrypted block contains the credentials over the DIAMETER protocol. Taking acount on this, we going to parse the block to extract the credentias. 

In [700]:
def parse_diameter_block(block):
    credentials = {}
    offset = 0
    while True:
        type = int.from_bytes(block[offset : offset + 4])
        start_data = offset + 8
        size = block[start_data:].index(0) 
        end_data = size + start_data
        data = block[start_data : end_data].decode()
        if type == 1:
            end_package = start_data + 8 * ((size// 8) + 1) # size of data + padding up to a multiple of 8 octets
            credentials['username'] = data
        elif type == 2:
            end_package = start_data + 16 * ((size// 16) + 1) # size of data + padding up to a multiple of 8 octets
            credentials['password'] = block[start_data: end_data].decode()
        # Jump to the next AVP
        offset = end_package
        if end_package >= len(block):
            break
    return credentials

Extract the credentials

In [701]:
credentials = parse_diameter_block(c)
print('Credentials:')
for key, value in credentials.items():
    print(f'\t{key} : {value}')

Credentials:
	username : user2
	password : user2


# Question 5

Compute key block

In [702]:
s = client_hello_random + server_hello_random
key_block = PRF(MASTER_KEY, label="ttls keying material".encode(), seed=s, algo=hashes.SHA384(), n=2)
len(key_block)

96

Compute MSK

In [703]:
MSK = key_block[0:64]

Compute PTK

In [704]:
def SHA1_HMAC(key, input, length):
    r=bytes()
    for i in range(1 + length // 20):
        r+= HMAC(key, hashes.SHA1(), input + chr(i).encode())
    return r[:length]

In [705]:
# find a handshake sample
handshake_sample =  tshark.find_4_handshake_sample()

# extract the first two packages  
packet_1 = handshake_sample[0]
packet_2 = handshake_sample[1]

# extract the required fields to generate the block
a_mac = packet_1.a_mac
s_mac = packet_1.s_mac
a_nonce = packet_1.nonce
s_nonce = packet_2.nonce

# ordered the fields (less first)
mac_1, mac_2 = min(a_mac, s_mac) , max(a_mac, s_mac)
nonce_1, nonce_2 = min(a_nonce, s_nonce), max(a_nonce, s_nonce)

# generate data blocks
data = 'Pairwise key expansion'.encode() + chr(0x00).encode() + mac_1 + mac_2 + nonce_1 + nonce_2

PTK = SHA1_HMAC(key=MSK[0:32], input=data, length=48)

Solution validation

In [706]:
print('MSK')
validate_solution(MSK.hex(), '85e8', '6e3b', 64)
print('PTK')
validate_solution(PTK.hex(), 'b3c6', '8ae4', 48)

MSK
Solution: 85e852ebe46df2c663495897d383ea61a2eaec1ce8a4f4bb927f0f4a6f2c93d2e8ecf740309050c6efaaffe67483297e2f3faa87816ea7a60b13545b45e16e3b
[92mCorrect Solution.
[0m
PTK
Solution: b3c6efd19215419142ab25148819e4b6544ba9bc1b08363e66032228f5793258c05ca392cb34ff1b022fbfdcf68b8ae4
[92mCorrect Solution.
[0m


# Question 6

Extract aux keys from PTK

In [707]:
KCK = PTK[:16]
KEK = PTK[16:32]
TK = PTK[32:48]

In [708]:
def compute_mic(KCK, mess):
    data = mess[:81]
    for _ in range(16):
        data+=chr(0x00).encode()
    data += mess[81+16:]
    return HMAC(KCK, hashes.SHA1(), data)[:16]

To prove the authenticity of handshake messages 2, 3 and 4, the MIC is computed with the KCK and the 801.x authentication part and verified to be equal to the MIC of the packet.

In [709]:
for i in range(2, 5):
    packet = handshake_sample[i-1]
    mess = packet.mess
    mic = compute_mic(KCK, mess)
    check_equals(mess[81:81+16].hex(), mic.hex())

Expected: e78f5cf94a22afab0f9dea051bf3322f
Solution: e78f5cf94a22afab0f9dea051bf3322f
[92mCorrect Solution.
[0m
Expected: 5898a562fc962f3240e2da645044566c
Solution: c96c0eeee26d6ab2538b98f327a2290c
[0;31mIncorrect Solution.
[0m
Expected: 6982cd2bd03c5551600e1351f3e4c6f5
Solution: 6982cd2bd03c5551600e1351f3e4c6f5
[92mCorrect Solution.
[0m


# Question 7

In [710]:
multicast_packet = tshark.find_package_by_frame_number(131)

Since the packet is a multicast packet we going to obtain the GTK (In case it is necessary)

In [711]:
# compute GTK
packet_3 = handshake_sample[2]
GTK = aes_key_unwrap(KEK, packet_3.data)
packet_3.data.hex()

InvalidUnwrap: 

In [None]:
def AESCCM_decrypt(key, tag_length, in_data, nonce, associated_data):
    aesccm = AESCCM(key, tag_length)
    return aesccm.decrypt(nonce, in_data, associated_data)

Once the GTK is obtained, we are going to extract the rest of the values that are necessary to decrypt the message.

In [None]:
frame_control = multicast_packet.frame_control
add_1 = multicast_packet.add_1
add_2 = multicast_packet.add_2
add_3 = multicast_packet.add_3
ccmp_par = multicast_packet.ccmp_par

# compute the nonce
nonce_multicast = chr(0x00).encode() + add_2 + ccmp_par[0:2] + ccmp_par[4:8]
# compute the ADD
add_multicast = frame_control + add_1 + add_2 + add_3 + 2 * chr(0x00).encode()

With the values calculated, is time to descrypt the message. First, it is necesary chosise the key to use (TK or GTK). To do this, the value of the key index field extracted from the CCMP parameters of the block must be checked. In case of value 0 the TK is used and in case of value 1 the GTK will be used.

In [None]:
data = multicast_packet.data
# choise the key in function of the packet ccmp key index
key = TK if multicast_packet.ccmp_key_index == 0 else GTK

plain_text = AESCCM_decrypt(key, tag_length=8, in_data=data, nonce=nonce_multicast, associated_data=add_multicast)
plain_text

b'\xaa\xaa\x03\x00\x00\x00\x08\x00F\xc0\x00(\x00\x00@\x00\x01\x02@\xed\xc0\xa8\x02d\xe0\x00\x00\x16\x94\x04\x00\x00"\x00\xf9\x01\x00\x00\x00\x01\x04\x00\x00\x00\xe0\x00\x00\xfc'

As the cryptography process finishes without errors, the packet has been decrypted. The data encrypted are packet IPv4 with the  Internet Group Management Protocol(IGMP)