# Tiny-TLS 1.3 Toy Implementation for COSI107a, Spring 2025

This document contains the implementation of a toy version of TLS 1.3, to be used as material for Brandeis' COSI107a course. The goal is to implement a minimalist version of TLS 1.3 that can communicate with a server using the protocol.

This is a work in progress, and it is expected that changes will be made to the protocol as we move forward


## 1. Import the necessary libraries ✅

Since we're working with TLS in Python, it is helpful to use Python libraries that allow us to pack our data into binary form. TLS protocol specifies exact byte length and format, and we'll be doing a lot of conversion between numbers and bytes. We'll use the Python Struct library for this purpose

We'll also be importing the x25519 curve from the Cryptography library to generate the key used in our messages



In [245]:
import struct

from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf import hkdf
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.backends import default_backend

import socket
import time
from urllib.parse import urlparse


import os #for random nonce generation

# 2a. Define some helper functions 🟨

TLS represents all of its messages in byte form. As such, some elements (such as the content length) must be converted from integer to big edian, specifically with 2 bytes. We also need to be able to join the distinct elements in a message into a single byte slice


In [246]:
def u16_to_byte(x: int) -> bytes:
    return struct.pack('>H', x) # Use the struct package to pack a number into big-edian 2 bytes

def concatenate(*bufs: bytes) -> bytes: # Concatenate multiple byte slices into one singular byte slice
    return b''.join(bufs)

def print_bytes_as_hex(b: bytes) -> None: # This is to print the bytes as hex strings. Easier to double check this way
    hex_string = ' '.join([f'{x:02x}' for x in b])
    print(hex_string)

# 2b. Define our private key and public key ✅

We can then generate the public key and private key using the x25519 curve from Cryptography


In [247]:
def key_pair() -> bytes:
    private_key = x25519.X25519PrivateKey.generate()
    public_key = private_key.public_key()
    return private_key, public_key

private_key, public_key = key_pair()

print(private_key.private_bytes(
    encoding=serialization.Encoding.Raw,
    format=serialization.PrivateFormat.Raw,
    encryption_algorithm=serialization.NoEncryption()
    ))
print(public_key.public_bytes(
    encoding=serialization.Encoding.Raw,
    format=serialization.PublicFormat.Raw
    )) 


b'\x008\x9b\xb4\xcd3\xe0\xba\x85\xbf\xca,\xb4\xfd\x8a\xd2\xda\xe7zv\xb7\xf1\xf6!\x02\x8f\xa9GJ\xe2!R'
b'\x07\x83i\x99\xe3e"\x1a\xe6\xd3Sf\xb9:\xae\xa6d5R8\xf1f\x1c\x97\x1d\xc0\x89\x1d*F\xa7r'


Notice how each time we run the code, the public_key is different. This is intended behavior, as it prevents attackers from being able to predict our key pairs

# 3. The Extension Blueprint: 🟨

Extensions play a large part in shaping a TLS message. From negotiating supported key group to exchanging keys, all of these are achieved using extensions. Luckily for us, these different extensions have a common blueprint



In [248]:
def extension(id: int, content: bytes) -> bytes:
    return concatenate(
        u16_to_byte(id),                         #The ID of the extension. (e.g 0x0a = Supported Group)
        u16_to_byte(len(content)),               #Length of content
        content                                  #The actual content itself
    )

print_bytes_as_hex(extension(0x0a, bytes([0x00, 0x1d]))) #Example Supported Group extension


00 0a 00 02 00 1d


# 4. The ClientHello 🟨

With those building blocks, we can now write our first TLS message: The ClientHello. The ClientHello is always the first message to be sent in a TLS handshake, indicating that the client wants to connect with the server

The ClientHello has these components, in this order:
1. ProtocolVersion (Negotiate which version of TLS we're using)
2. Random Nonce (32 bit, for key generation)
3. Legacy Session ID (For our purposes, we won't be using sessions)
4. Cipher Suites (contains a suite of cipher - how to actually encrypt the key once we have it)
5. Legacy Compression Method (For TLS 1.3, this is null)
6. Extensions 

The code for it is as follows:

In [249]:
def client_hello() -> bytes:
    client_random = os.urandom(32)
    def key_share(pubkey: bytes) -> bytes:      # Encode our public key to be sent over the message
        return concatenate(
            u16_to_byte(len(pubkey) + 4),       # +4 represents the 4 extra byte before the pubkey (2 bytes for the x25519, 2 bytes for len of pubkey)
            u16_to_byte(0x1d),                  # 0x1d is the value for x25519 key
            u16_to_byte(len(pubkey)),           
            pubkey
        )
    
    def DNI(domain: str) -> bytes:
        return concatenate(
            u16_to_byte(len(bytes(domain,'utf-8')) + 3),
            bytes([0x00]),
            u16_to_byte(len(bytes(domain,'utf-8'))),
            bytes(domain, 'utf-8')
        )
    
    def extensions() -> bytes: #This intializes the extensions we need in our message
        return concatenate(
            extension(0x00, DNI('www.cloudflare.com')),
            extension(0x0a, bytes([0x00, 0x02, 0x00, 0x1d])), #Supported Group extensions. Currently only contains the x25519 curve
            extension(0x0d, bytes([0x00, 0x12, 0x04, 0x03, 0x08, 0x04, 0x04, 0x01, 0x05, 0x03, 0x08, 0x05, 0x05, 0x01, 0x08, 0x06, 0x06, 0x01, 0x02, 0x01])),
            extension(0x33, key_share(public_key.public_bytes(
                                encoding=serialization.Encoding.Raw,
                                format=serialization.PublicFormat.Raw
                            ))), #Key Share. Contains the public key generated from the x25519 curve
            extension(0x2b, bytes([0x02, 0x03, 0x04])) #TLS Version. This is how we negotiate TLS 1.3
        )
    
    def handshake() -> bytes: #This constitutes our actual ClientHello message
        return concatenate(
            bytes([0x03, 0x03]),                          # This value is for TLS 1.2. TLS 1.3 must disguise itself as TLS 1.2 to be received, after which it negotiates into TLS 1.3 through the TLS 1.3 extension
            client_random,                             # Random Nonce for key
            bytes([0x00]),                                # Session ID. Empty for our purposes
            bytes([0x00, 0x02, 0x13, 0x02]),              # Cipher Suite. We have a single cipher for our cipher suite (SHA384)
                                                          # I'm aware that there are 2 SHA384 ciphers: AES and CHACHA. I've included one here. Not sure
                                                          # if we need the other one or not
            bytes([0x01, 0x00]),                          # Compression Method. Empty for our purposes
            u16_to_byte(len(extensions())),
            extensions()
        )
    
    return concatenate(                                 #Include record layers for TLS 1.3 to complete message
        bytes([0x16, 0x03, 0x01]),
        u16_to_byte(len(handshake()) + 4),
        bytes([0x01,0x00]),
        u16_to_byte(len(handshake())),
        handshake()
    ), client_random

client_hello_msg, client_random = client_hello()
print_bytes_as_hex(client_hello_msg)
client_hello_no_record = client_hello_msg[5:] #This is to calculate the transcript hash
print_bytes_as_hex(client_hello_no_record)


## To check the validity of the ClientHello, copy paste the ClientHello into the openSSL_debug.py script and run it. Server should give back a response

16 03 01 00 9b 01 00 00 97 03 03 27 39 3d ac d6 47 ce 9b 9b a3 47 ee b3 52 85 64 8a 7e 9a 4a 34 5d 7b 11 48 62 8f 31 9f b4 ce b6 00 00 02 13 02 01 00 00 6c 00 00 00 17 00 15 00 00 12 77 77 77 2e 63 6c 6f 75 64 66 6c 61 72 65 2e 63 6f 6d 00 0a 00 04 00 02 00 1d 00 0d 00 14 00 12 04 03 08 04 04 01 05 03 08 05 05 01 08 06 06 01 02 01 00 33 00 26 00 24 00 1d 00 20 07 83 69 99 e3 65 22 1a e6 d3 53 66 b9 3a ae a6 64 35 52 38 f1 66 1c 97 1d c0 89 1d 2a 46 a7 72 00 2b 00 03 02 03 04
01 00 00 97 03 03 27 39 3d ac d6 47 ce 9b 9b a3 47 ee b3 52 85 64 8a 7e 9a 4a 34 5d 7b 11 48 62 8f 31 9f b4 ce b6 00 00 02 13 02 01 00 00 6c 00 00 00 17 00 15 00 00 12 77 77 77 2e 63 6c 6f 75 64 66 6c 61 72 65 2e 63 6f 6d 00 0a 00 04 00 02 00 1d 00 0d 00 14 00 12 04 03 08 04 04 01 05 03 08 05 05 01 08 06 06 01 02 01 00 33 00 26 00 24 00 1d 00 20 07 83 69 99 e3 65 22 1a e6 d3 53 66 b9 3a ae a6 64 35 52 38 f1 66 1c 97 1d c0 89 1d 2a 46 a7 72 00 2b 00 03 02 03 04


# 5. Sending the ClientHello: 🟨
Since we now have the ClientHello, the next step would be to send it over to a public domain and see if the server responds. For this part, we'll be using www.cloudbase.com, as it supports TLS 1.3, and the cipher suite matches with our clientHello.


In [250]:
server_hello_no_record = b""
encrypted_payload = b""



def parse_tls_record(data):
    #Parse a TLS record and return its components
    if len(data) < 5:  # Minimum TLS record length
        return None, None, None, data
    
    content_type = data[0] #Parse the record header to see which type of record this is.
    if (data[1] == 3) and (data[2] == 3):
        version = "1.2/1.3"
    length = int.from_bytes(data[3:5], byteorder='big')
    
    if len(data) < length + 5:
        return None, None, None, data
        
    if content_type == 0x17:
        payload = data[:5+length]  # Include the header for application data. We need the header for the decryption algorithm
        remaining_data = data[5+length:]
    else:
        payload = data[5:5+length]  # Original behavior for other types
        remaining_data = data[5+length:]
    
    content_type_names = { #Dictionary indicating the types of TLS records we may receive
        0x14: "Change Cipher Spec",
        0x15: "Alert",
        0x16: "Handshake",
        0x17: "Application Data"
    }
    
    type_name = content_type_names.get(content_type, f"Unknown ({content_type})")
    
    return type_name, version, payload, remaining_data

def parse_handshake_message(data): #This is exclusive for Handshake messages. We'll primarily use this to identify serverHellos.
    """Parse a TLS handshake message."""
    if len(data) < 4:
        return None, None, None
        
    msg_type = data[0]
    length = int.from_bytes(data[1:4], byteorder='big')
    
    handshake_types = {
        0x00: "Hello Request",
        0x01: "Client Hello",
        0x02: "Server Hello",
        0x0b: "Certificate",
        0x0c: "Server Key Exchange",
        0x0d: "Certificate Request",
        0x0e: "Server Hello Done",
        0x0f: "Certificate Verify",
        0x10: "Client Key Exchange",
        0x14: "Finished",
        0x08: "Encrypted Extensions",  # Added TLS 1.3 message types
        0x13: "Certificate Status"
    }
    
    type_name = handshake_types.get(msg_type, f"Unknown ({msg_type})")
    if len(data) >= 4+length:
        payload = data[:4+length]
    
    return type_name, length, payload

def format_bytes(data):
    """Format bytes as a hexadecimal string with ASCII representation."""
    hex_dump = ' '.join(f'{b:02x}' for b in data)
    return f"Hex: {hex_dump}"

def receive_all_data(sock, timeout=5):
    """Receive all data from the socket until timeout or connection close."""
    buffer = b""
    sock.settimeout(timeout)
    start_time = time.time()
    
    while True:
        try:
            chunk = sock.recv(16384)
            if not chunk:  # Connection closed by server
                break
            buffer += chunk
            
            # Check timeout
            if time.time() - start_time > timeout:
                break
                
        except socket.timeout:
            break
            
    return buffer
# Parse the URL to get the hostname
parsed_url = urlparse('https://www.cloudflare.com/')
host = parsed_url.hostname
port = 443
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
print(f"Attempting to connect to {host}:{port}")
s.connect((host, port))
print("Connected to server.")

# Your existing client_hello
try:
    
    bytes_sent = s.sendall(client_hello_msg)
    print(f"Sent ClientHello ({len(client_hello_msg)} bytes)")
    
    print("\nWaiting for server response...")
    try:
        buffer = receive_all_data(s)
        print(f"\nReceived {len(buffer)} new bytes")
        print(print_bytes_as_hex(buffer))


        
        # Process all complete TLS records in the buffer
        while buffer:
            record_type, version, payload, remaining = parse_tls_record(buffer)
            if record_type is None:  # Incomplete record
                break
                
            buffer = remaining
            print("\n" + "="*50)
            print(f"Record Type: {record_type}")
            print(f"TLS Version: {version}")
            
            if record_type == "Handshake":
                msg_type, msg_length, msg_payload = parse_handshake_message(payload)
                if msg_type:
                    print(f"Handshake Type: {msg_type}")
                    print(f"Message Length: {msg_length}")
                    if msg_payload:
                        print("\nMessage Payload:")
                        print(format_bytes(msg_payload))
                        server_hello_no_record = msg_payload
            else:
                print("\nRecord Payload:")
                if record_type == "Application Data":
                    encrypted_payload = payload
                print(format_bytes(payload))
            
    except socket.timeout:
        print("Timeout while waiting for server response")
            
except ConnectionRefusedError:
    print("Connection refused. Is the OpenSSL server running?")
except socket.timeout:
    print("Connection attempt timed out")
except Exception as e:
    print(f"Error: {e}")

print("\nExtracted Data:")
if server_hello_no_record:
    print("\nServer Hello (without record header):")
    print(print_bytes_as_hex(server_hello_no_record))
if encrypted_payload:
    print("\nEncrypted Payload (with record header):")
    print(print_bytes_as_hex(encrypted_payload))

Attempting to connect to www.cloudflare.com:443
Connected to server.
Sent ClientHello (160 bytes)

Waiting for server response...

Received 2820 new bytes
16 03 03 00 5a 02 00 00 56 03 03 ee a5 fc 0a 6d da f4 0b ca 67 5c 07 1c 67 66 63 b0 5e c1 51 b5 70 37 82 d1 9a f9 7c 08 d5 a1 2b 00 13 02 00 00 2e 00 33 00 24 00 1d 00 20 a5 bb b0 d3 4e 8b e5 e7 92 8a 53 49 d8 4b d5 1b d5 22 67 2d f7 c9 53 b8 59 88 83 d3 db ff 1c 6c 00 2b 00 02 03 04 14 03 03 00 01 01 17 03 03 0a 9a d8 fc 1a 6f a4 60 55 a2 3e 66 da fe e0 65 5d 76 09 55 a4 26 11 76 fb 4d c2 98 05 8b 35 f7 4c 65 86 ba d4 e2 b5 a4 63 bf e2 bb b3 27 2c dd 2c 76 f1 57 49 a1 2f ac 70 de 4f d4 e6 6f 08 d5 cd 39 46 08 da e1 57 de 9f 1a a8 b1 bc 08 59 69 36 62 92 f0 bd 50 33 f3 7b 04 a4 54 f9 19 2b b0 5f 36 27 d2 b2 54 bc e3 b0 88 e6 0b cf 50 25 ba c3 2b 65 19 7c 4b 6a 36 5b 65 3f d2 f5 ae 7c e1 9e 65 f5 64 40 9e d2 81 7a 5c dc 98 a2 08 b4 19 96 2e 50 70 b6 72 64 b9 b9 44 12 5b 38 18 2b ae 4f 60 8e a2 43 d1 3f 6c d1 bc fb 24 59 e4 38 d6 72 4b

# 6. Parsing the ServerHello ✅

Once the ClientHello is sent, the server responds with its own message. This message contains the ServerHello, the CipherChangeSpec, and the encrypted payload. Assuming that our ClientHello message is configured correctly, the server will respond with its chosen cipher suite and its own key. 

We are particularly interested in the server's key and server random, which we use to establish cryptographic parameters. We can then extract these properties and start the key calculation process

# 6a. Creating a Parser class ✅

The ServerHello message isn't always of the same size. Certain elements like the sessionID, as well as the content of extensions, may have variable length depending on the message itself. That's why we can't just extract the information based on indexes alone. We need a parser that would keep track of what element we're at and how many bytes we have to skip forward


In [251]:

class Parser:
    def __init__(self, data: bytes) -> None:
        self.data = data
        self.cursor = 0
    
    def skip(self, position: int) -> None:
        self.cursor += position

    def read(self, position: int) -> bytes:
        result = self.data[self.cursor : self.cursor + position]
        self.cursor += position
        return result 
    
    def read_uint8_prefixed(self) -> bytes:                           #Most extensions with variable lengths have bytes that denote their length. We can use this to skip forward appropriately 
        length = self.data[self.cursor]
        print(length)
        self.cursor += 1
        result = self.data[self.cursor : self.cursor + length]
        self.cursor += length
        return result

    def read_uint16_prefixed(self) -> bytes:
        length = int.from_bytes(self.data[self.cursor:self.cursor + 2], 'big') #Since some lengths are represented with 2 bytes, we need to convert them 
        self.cursor += 2
        result = self.data[self.cursor:self.cursor + length]
        self.cursor += length
        return result


# 6b. Parsing the ServerHello ✅

With our Parser class, we can now parse the ServerHello to extract the ServerRandom and the public key. Keep in mind that we are only dealing with 1 cipher suite, 1 cryptographic and as such expects only 1 public key. A full version of TLS 1.3 will be much more complex

In [252]:
def server_hello_parser(msg: bytes):
    parser = Parser(msg)
    parser.skip(4)                                  #Skip HandShake Header
    parser.skip(2)                                  #Skip Server Version
    server_random = parser.read(32)                 #Get the serverRandom
    parser.read_uint8_prefixed()                    #Skip SessionID
    parser.skip(2)                                  #Skip Cipher Suite
    parser.skip(1)                                  #Skip Compression Method
    public_key = None
    extensions = parser.read_uint16_prefixed()
    extension_reader = Parser(extensions)
    while(extension_reader.cursor < len(extensions)):
        extension_type = extension_reader.read(2)
        extension_data = extension_reader.read_uint16_prefixed()
        if (extension_type == b'\x00\x33'):
            data = Parser(extension_data)
            data.skip(2)
            public_key = data.read_uint16_prefixed()
    return server_random, public_key


#Pre-generated server_hello_msg, used for debugging purposes
#server_hello_no_record = bytes.fromhex("02 00 00 76 03 03 70 71 72 73 74 75 76 77 78 79 7a 7b 7c 7d 7e 7f 80 81 82 83 84 85 86 87 88 89 8a 8b 8c 8d 8e 8f 20 e0 e1 e2 e3 e4 e5 e6 e7 e8 e9 ea eb ec ed ee ef f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff 13 02 00 00 2e 00 2b 00 02 03 04 00 33 00 24 00 1d 00 20 9f d7 ad 6d cf f4 29 8d d3 f9 6d 5b 1b 2a f9 10 a0 53 5b 14 88 d7 f8 fa bb 34 9a 98 28 80 b6 15")
#Run the script with the pre-generated server hello and compare with https://tls13.xargs.org/#server-handshake-keys-calc to see if the script is working
#Current serverHello generated from www.cloudbase.com. Right now I'm copy-pasting from the terminal when I run the debug.py script. Also parser is ignoring the record header.
server_random, public_key = server_hello_parser(server_hello_no_record)
print_bytes_as_hex(server_random)
print_bytes_as_hex(public_key)


           

        
        
        


0
ee a5 fc 0a 6d da f4 0b ca 67 5c 07 1c 67 66 63 b0 5e c1 51 b5 70 37 82 d1 9a f9 7c 08 d5 a1 2b
a5 bb b0 d3 4e 8b e5 e7 92 8a 53 49 d8 4b d5 1b d5 22 67 2d f7 c9 53 b8 59 88 83 d3 db ff 1c 6c


# 7. Key Derivation ✅

Now that we have our private key and the server has sent back their public key, we can now encrypt our data to the server via key calculations. We can break down the key calculations into a few steps

# 7a. Transcript Hash ✅

Transcript Hash refers to the hash of the ClientHello and the ServerHello messages. The idea is to associate these messages with the keys we are about to derive: These keys work with these messages. That way, even if an attacker somehow got hold of our keys, it would not work with whatever messages they try to send



In [253]:
def transcript_hash(client: bytes, server: bytes) -> bytes:
    digest = hashes.Hash(hashes.SHA384()) # We'll be using SHA384 to create the hash for our messages.
    digest.update(client)     # Note that both the ClientHello and ServerHello is used here
    digest.update(server)
    transcript_hash = digest.finalize()
    return transcript_hash

print_bytes_as_hex(client_hello_no_record)       #This is the clientHello message we generated from our code
print_bytes_as_hex(server_hello_no_record)       #Right now, this server_hello is pre-generated for testing purposes. We'll be parsing an actual server hello when the entire thing is ready

# Testing value for transcript hash calculations
# client_hello_no_record = bytes.fromhex("01 00 00 f4 03 03 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 10 11 12 13 14 15 16 17 18 19 1a 1b 1c 1d 1e 1f 20 e0 e1 e2 e3 e4 e5 e6 e7 e8 e9 ea eb ec ed ee ef f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff 00 08 13 02 13 03 13 01 00 ff 01 00 00 a3 00 00 00 18 00 16 00 00 13 65 78 61 6d 70 6c 65 2e 75 6c 66 68 65 69 6d 2e 6e 65 74 00 0b 00 04 03 00 01 02 00 0a 00 16 00 14 00 1d 00 17 00 1e 00 19 00 18 01 00 01 01 01 02 01 03 01 04 00 23 00 00 00 16 00 00 00 17 00 00 00 0d 00 1e 00 1c 04 03 05 03 06 03 08 07 08 08 08 09 08 0a 08 0b 08 04 08 05 08 06 04 01 05 01 06 01 00 2b 00 03 02 03 04 00 2d 00 02 01 01 00 33 00 26 00 24 00 1d 00 20 35 80 72 d6 36 58 80 d1 ae ea 32 9a df 91 21 38 38 51 ed 21 a2 8e 3b 75 e9 65 d0 d2 cd 16 62 54")
# server_hello_no_record = bytes.fromhex("02 00 00 76 03 03 70 71 72 73 74 75 76 77 78 79 7a 7b 7c 7d 7e 7f 80 81 82 83 84 85 86 87 88 89 8a 8b 8c 8d 8e 8f 20 e0 e1 e2 e3 e4 e5 e6 e7 e8 e9 ea eb ec ed ee ef f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff 13 02 00 00 2e 00 2b 00 02 03 04 00 33 00 24 00 1d 00 20 9f d7 ad 6d cf f4 29 8d d3 f9 6d 5b 1b 2a f9 10 a0 53 5b 14 88 d7 f8 fa bb 34 9a 98 28 80 b6 15")


hello_hash = transcript_hash(client_hello_no_record, server_hello_no_record)
print_bytes_as_hex(hello_hash)

01 00 00 97 03 03 27 39 3d ac d6 47 ce 9b 9b a3 47 ee b3 52 85 64 8a 7e 9a 4a 34 5d 7b 11 48 62 8f 31 9f b4 ce b6 00 00 02 13 02 01 00 00 6c 00 00 00 17 00 15 00 00 12 77 77 77 2e 63 6c 6f 75 64 66 6c 61 72 65 2e 63 6f 6d 00 0a 00 04 00 02 00 1d 00 0d 00 14 00 12 04 03 08 04 04 01 05 03 08 05 05 01 08 06 06 01 02 01 00 33 00 26 00 24 00 1d 00 20 07 83 69 99 e3 65 22 1a e6 d3 53 66 b9 3a ae a6 64 35 52 38 f1 66 1c 97 1d c0 89 1d 2a 46 a7 72 00 2b 00 03 02 03 04
02 00 00 56 03 03 ee a5 fc 0a 6d da f4 0b ca 67 5c 07 1c 67 66 63 b0 5e c1 51 b5 70 37 82 d1 9a f9 7c 08 d5 a1 2b 00 13 02 00 00 2e 00 33 00 24 00 1d 00 20 a5 bb b0 d3 4e 8b e5 e7 92 8a 53 49 d8 4b d5 1b d5 22 67 2d f7 c9 53 b8 59 88 83 d3 db ff 1c 6c 00 2b 00 02 03 04
84 cf b7 9b 47 52 d8 aa f5 73 14 2c 21 59 44 cb 9a c8 b7 9c a8 a4 d8 3e 33 3c 09 16 17 a4 11 18 e7 1a e7 a3 22 25 97 5a 62 9f 6e 6a 32 3b 23 27


# 7b. Shared secret ✅
The idea of the key exchange is that given the private key and the public key of the other party, both client and server can perform calculations to arrive at the same number. Essentially, client private x server public = server private x client public. This is called the shared secret. We'll perform the calculations on our side while the server does it on theirs.


In [254]:
# Our public key right now is raw bytes. Our private key, on the other hand, is a x25519PrivateKey object. The most convenient way to calculate the 
# shared secret is to turn the public key into a x25519PublicKey object and leverage the built in methods to generate the shared secret


#Test key exchange
# test_client_private_key = x25519.X25519PrivateKey.from_private_bytes(bytes.fromhex("202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f"))
# test_client_public_key = test_client_private_key.public_key()
# print_bytes_as_hex(test_client_public_key.public_bytes(
#     encoding=serialization.Encoding.Raw,
#     format=serialization.PublicFormat.Raw
#     ))

# test_server_private_key = x25519.X25519PrivateKey.from_private_bytes(bytes.fromhex("909192939495969798999a9b9c9d9e9fa0a1a2a3a4a5a6a7a8a9aaabacadaeaf"))
# test_server_public_key = test_server_private_key.public_key()
# print_bytes_as_hex(test_server_public_key.public_bytes(
#     encoding=serialization.Encoding.Raw,
#     format=serialization.PublicFormat.Raw
#     ))

# test_shared_secret = test_client_private_key.exchange(test_server_public_key)
# print_bytes_as_hex(test_shared_secret)


server_public_key = x25519.X25519PublicKey.from_public_bytes(public_key) 
# private_key = x25519.X25519PrivateKey.from_private_bytes(bytes.fromhex("202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f"))
shared_secret = private_key.exchange(server_public_key)
print_bytes_as_hex(shared_secret)







0a 3a 2d 7d 63 f5 4e 83 09 dd 17 60 e5 e1 c3 56 68 f0 20 f8 fb 8e df a0 1b 4b d1 e2 c1 87 6a 37


# 7c. Early Secret, Derived Secret, Handshake Secret ✅
We then go on to generate the early secret, the derived secret and finally the handshake secret. 

- Early secret is for 0-RTT data (essentially data sent with pre-shared keys). Since we're not using PSKs, this is a bunch of 0 bytes.
- Derived secret is mainly used as salt, mixed in with the early secret to get the handshake key. This way, even if an attacker has the early secret, it's hard to determine the handshake key

Each of these secrets have a "label" associated with them to identify their use, usually with the format "tls13 + ID of the component". 

* Note 1: We can pass in the label directly into the HKDF info parameter, or we can make a method that will append the TLS13 infront and add the necessary bytes for length, after which it is passed into the infor parameter. To avoid hardcoding, I went with the second options. If this example works, we can go back and try hardcoding the labels to see if it affects anything


In [255]:
## REMEMBER TO CHANGE IT BACK TO SHA256 AFTER TESTING. Algorithm goes back to SHA256, Length goes back to 32.


# UNCOMMENT THIS FOR TESTING. SHOULD WORK FOR SHA384 IMPLEMENTATIONS:

# shared_secret = bytes.fromhex('df4a291baa1eb7cfa6934b29b474baad2697e29f1f920dcc77c8a0a088447624')
# hello_hash = bytes.fromhex('e05f64fcd082bdb0dce473adf669c2769f257a1c75a51b7887468b5e0e7a7de4f4d34555112077f16e079019d5a845bd')




# TLS specs make use of 2 methods: HKDF-Extract and HKDF-Expand. HKDF-Extract can be found in hkdf.HKDF()_extract(). HKDF-Expand is hkdf.HKDFExpand()
# Define our own hkdf_expand_label. This is basically calling HKDF to derive secrets, with an added step of configuring the labels associated with the component.

def hkdf_expand_label(secret: bytes, label: str, context: bytes, length: int) -> bytes:
    # Construct the HkdfLabel as specified in TLS 1.3
    label = b"tls13 " + label.encode('utf-8')
    hkdf_label = (
        struct.pack("!H", length) +  # 2 bytes for length
        struct.pack("!B", len(label)) + label +  # 1 byte for label length + label
        struct.pack("!B", len(context)) + context  # 1 byte for context length + context
    )
    
    # Use HKDF-Expand
    encryption = hkdf.HKDFExpand(
        algorithm=hashes.SHA384(),
        length=length,
        info=hkdf_label,
    )
    
    return encryption.derive(secret)


early_secret = hkdf.HKDF(
    algorithm=hashes.SHA384(), #We negotiated SHA256 in our cipher suite
    length=48,
    salt=b'\x00',
    info=b'\x00'
)._extract(b'\x00' * 48) #The first input key is simply 00

empty_hash = hashes.Hash(hashes.SHA384())
empty_hash.update(b"")  # Empty string
empty_hash = empty_hash.finalize()

# We then derive another secret called the derived secret from the early secret
derived_secret = hkdf_expand_label(
    secret = early_secret,
    label="derived",
    context=empty_hash,
    length=48
)

# The derived secret is mixed into the early secret to get the handshake secret
handshake_secret = hkdf.HKDF(
    algorithm=hashes.SHA384(),
    length=48,
    salt=derived_secret,  # derived_secret is used as salt
    info=None
)._extract(shared_secret)

# 7d. Traffic Secrets and Key derivation ✅

Now that we have our handshake secret, we can now move on to derive the traffic secrets, and subsequently the keys and IVs for both the client and the server

In [256]:
# Deriving the secrets and subsequent keys + IVs.
client_secret = hkdf_expand_label(
    secret = handshake_secret, 
    label = "c hs traffic", 
    context = hello_hash, 
    length = 48
)

server_secret = hkdf_expand_label(
    secret = handshake_secret,
    label = "s hs traffic",
    context = hello_hash,
    length = 48
)


client_handshake_key = hkdf_expand_label(
    secret = client_secret,
    label = "key",
    context = b"",
    length = 32
)

server_handshake_key = hkdf_expand_label(
    secret = server_secret,
    label = "key",
    context = b"",
    length = 32
)

client_handshake_iv = hkdf_expand_label(
    secret = client_secret,
    label = "iv",
    context = b"",
    length = 12
)

server_handshake_iv = hkdf_expand_label(
    secret = server_secret,
    label = "iv",
    context = b"",
    length = 12
)




print("handshake secret: " + handshake_secret.hex())
print("client secret: " + client_secret.hex())
print("server secret: " + server_secret.hex())
print("client handshake key: " + client_handshake_key.hex())
print("client handshake iv: "+ client_handshake_iv.hex())
print("server handshake key: " + server_handshake_key.hex())
print("server handshake iv: " + server_handshake_iv.hex())


handshake secret: dc3cab30347db5d4f391c6460d4ceb576d6aacc9e5ac50754722f72bce7d30bb07e63143fa6d07fc07c61905d4fe4e9d
client secret: 9c8d4bfc445895673137d595fe6bfc907cf0ab4b8ef096ccf5d978e0ddbd5f6f6f56d9f04f35342f47537494c46e4368
server secret: 05784f3f6d273041922fbb861189e3a5864a2ce1e09bbca74876420a3c7cfd74b37a3504c746ce06821a21293a25ad44
client handshake key: 8d2a3dd3d8bbcfb8071a85411f5740e0793e8cf2016edcc603311079cd4f30ab
client handshake iv: 95ab5c503c379f52c0ec4293
server handshake key: 95c2513984fde991996b559e225cb71940c25c2765341850a249eab03b64de67
server handshake iv: 1a6d170715b63e8ff92f0799


## 8. Decryption ✅

Now that we have the keys and IVs, the next step would be to decrypt the encrypted records. When sending the ClientHello, the server sends back a massive binary dump, including the ServerHello, the change cipher specs, and the actual payload, which is encrypted. We will use the keys and ivs generated to decrypt these records.

Normally, you'll see a bunch of records, each with its own content, from the server encrypted extension, certificates, etc. However, Cloudbase has aggregated all of these into a single record. What we're really interested in is the server's finished message.

In [257]:
def decrypt(key, iv, wrapper):
    # Split the wrapper into additional authenticated data (AAD) and ciphertext
    additional = wrapper[:5]
    ciphertext = wrapper[5:]
    
    # Initialize AES-GCM with the provided key
    aesgcm = AESGCM(key)
    
    # Decrypt the ciphertext
    try:
        plaintext = aesgcm.decrypt(iv, ciphertext, additional)
    except Exception as e:
        raise ValueError(f"Decryption failed: {e}")
    
    return plaintext

def find_finished_message(payload: bytes) -> int:
   # Look for Finished message header: 14 00 00 30 (for SHA384)
   finished_header = bytes([0x14, 0x00, 0x00, 0x30])
   return payload.find(finished_header)



# Right now I'm doing a lot of copy pasting. I run the openSSL_debug script to retrieve the server's response, and then replace the serverHello, as well as the encrypted record under this line. Managed to decrypt the thing.

payload = decrypt(server_handshake_key, server_handshake_iv, encrypted_payload)
print_bytes_as_hex(payload)

# Get index where Finished message starts
finished_index = find_finished_message(payload)
print(finished_index)

# Get everything before the Finished message
server_handshake_no_finished = payload[:finished_index] if finished_index != -1 else payload
server_handshake_finished_message = payload[finished_index:-1]

print("Server handshake: ")
print_bytes_as_hex(server_handshake_no_finished)
print("Finished message: ")
print_bytes_as_hex(server_handshake_finished_message)


#Test payload:
#server_handshake_key = bytes.fromhex("9f13575ce3f8cfc1df64a77ceaffe89700b492ad31b4fab01c4792be1b266b7f")
#server_handshake_iv = bytes.fromhex("9563bc8b590f671f488d2da3")
#encrypted_payload = bytes.fromhex("17 03 03 00 17 6b e0 2f 9d a7 c2 dc 9d de f5 6f 24 68 b9 0a df a2 51 01 ab 03 44 ae")
#payload = decrypt(server_handshake_key, server_handshake_iv, encrypted_payload)
#print("Test payload: " + payload.hex())





08 00 00 06 00 04 00 00 00 00 0b 00 09 f8 00 00 09 f4 00 03 c4 30 82 03 c0 30 82 03 66 a0 03 02 01 02 02 10 12 c6 55 b2 e7 0b ce 7c 13 1a 7c e5 a2 e0 e7 00 30 0a 06 08 2a 86 48 ce 3d 04 03 02 30 3b 31 0b 30 09 06 03 55 04 06 13 02 55 53 31 1e 30 1c 06 03 55 04 0a 13 15 47 6f 6f 67 6c 65 20 54 72 75 73 74 20 53 65 72 76 69 63 65 73 31 0c 30 0a 06 03 55 04 03 13 03 57 45 31 30 1e 17 0d 32 35 30 33 30 31 31 37 34 34 32 30 5a 17 0d 32 35 30 35 33 30 31 38 34 34 31 38 5a 30 1d 31 1b 30 19 06 03 55 04 03 13 12 77 77 77 2e 63 6c 6f 75 64 66 6c 61 72 65 2e 63 6f 6d 30 59 30 13 06 07 2a 86 48 ce 3d 02 01 06 08 2a 86 48 ce 3d 03 01 07 03 42 00 04 50 1f b6 ee 9c c2 99 12 65 31 48 92 cb ec fa 69 ea d3 d4 4c 52 03 d6 cd d9 60 3d 2a 1c b0 04 a0 84 ff 88 84 fc dd 83 41 74 5e 1c ad 8b bd 2f e3 10 30 44 25 30 e3 f6 23 af 73 c6 2e 82 9f 33 5c a3 82 02 68 30 82 02 64 30 0e 06 03 55 1d 0f 01 01 ff 04 04 03 02 07 80 30 13 06 03 55 1d 25 04 0c 30 0a 06 08 2b 06 01 05 05 07 03 01 30 0c 06 03 55 1d 13 01 01 f

# 9. Application keys:
The application keys are nearly the last step when it comes to finishing the handshake. The handshake keys were there decrypt the server's response, containing encrypted extensions, certificates, etc. Once we decrypt the encryption, we then use them to generate another set of keys, called application keys. This will be the main keys we use to communicate with the server. The code themselves are basically the same as the ones used to generate handshake hash

# 9a. Application Key Hash:
Generate a hash of all the messages up until this point: (Client Hello, Server Hello, [unwrapped] Encrypted Extensions, [unwrapped] Server Certificate, [unwrapped] Server Certificate Verify, [unwrapped] Server Finished). This will be used to generate the application key

In [258]:
## ------- Potential bug in application_key_hash generation ------------

def application_key_hash(client: bytes, server: bytes, payload: bytes) -> bytes:
    digest = hashes.Hash(hashes.SHA384()) # We'll be using SHA384 to create the hash for our messages.
    digest.update(client)     # Note that both the ClientHello and ServerHello is used here
    digest.update(server)
    digest.update(payload)
    application_key_hash = digest.finalize()
    return application_key_hash

application_context = application_key_hash(client_hello_no_record, server_hello_no_record, server_handshake_no_finished)
print_bytes_as_hex(application_context)

## -------- Potential bug in application_key_hash generation --------------


f2 6f 36 fa 94 66 3a 96 53 3d ad 0c 27 1a b7 0b e3 5f 8f d0 fb bc e2 b9 68 af c7 41 ba 0f 43 b0 99 ba cf 53 43 38 18 ad 50 f6 94 20 1b 31 01 31


# 9b. Application key generation ✅
Same procedure with the handshake keys

In [259]:
# TestValues. Uncomment this and compare with https://tls13.xargs.org/#server-handshake-keys-calc
# handshake_secret = bytes.fromhex("bdbbe8757494bef20de932598294ea65b5e6bf6dc5c02a960a2de2eaa9b07c929078d2caa0936231c38d1725f179d299")
# application_context = bytes.fromhex("fa6800169a6baac19159524fa7b9721b41be3c9db6f3f93fa5ff7e3db3ece204d2b456c51046e40ec5312c55a86126f5")
# client_secret = bytes.fromhex("db89d2d6df0e84fed74a2288f8fd4d0959f790ff23946cdf4c26d85e51bebd42ae184501972f8d30c4a3e4a3693d0ef0")

zeros = bytes([0] * 32)
derived_secret = hkdf_expand_label(
    secret = handshake_secret,
    label="derived",
    context=empty_hash,
    length=48
)
master_secret = hkdf.HKDF(
    algorithm=hashes.SHA384(),
    length=48,
    salt=derived_secret,  # derived_secret is used as salt
    info=None
)._extract(b'\x00' * 48)


client_app_secret = hkdf_expand_label(
    secret = master_secret,
    label="c ap traffic",
    context = application_context,
    length = 48
)

server_app_secret = hkdf_expand_label(
    secret = master_secret,
    label = "s ap traffic",
    context = application_context,
    length = 48
)


client_app_key = hkdf_expand_label(
    secret = client_app_secret,
    label = "key",
    context = b"",
    length = 32
)

server_app_key = hkdf_expand_label(
    secret = server_app_secret,
    label = "key",
    context = b"",
    length = 32
)

client_app_iv = hkdf_expand_label(
    secret = client_app_secret,
    label = "iv",
    context = b"",
    length = 12
)

server_app_iv = hkdf_expand_label(
    secret = server_app_secret,
    label = "iv",
    context = b"",
    length = 12
)

print("client secret: " + client_secret.hex())
print("server secret: " + server_secret.hex())
print("client app key: " + client_app_key.hex())
print("client app iv: "+ client_app_iv.hex())
print("server app key: " + server_app_key.hex())
print("server app iv: " + server_app_iv.hex())

client secret: 9c8d4bfc445895673137d595fe6bfc907cf0ab4b8ef096ccf5d978e0ddbd5f6f6f56d9f04f35342f47537494c46e4368
server secret: 05784f3f6d273041922fbb861189e3a5864a2ce1e09bbca74876420a3c7cfd74b37a3504c746ce06821a21293a25ad44
client app key: 974f386980eca90566babb2d3b89e37c13882518758a361e8e71321bacc400bf
client app iv: 04fd85fcf911cef11a2fb5b4
server app key: 2a0c388431a3a2b474b92dd6f61662716a3eef8f0711df56cecb855e88ec7ed8
server app iv: 9f14dba06437401dd869474b


# 10. Generate the finished message
The last step would be to generate the Finished message and send it to the server. After that, we're finished with the handshake, and can start sending over HTTP requests

In [260]:
def calculate_verify_data(client_handshake_secret: bytes, transcript_hash: bytes) -> bytes:
    # First derive the finished key using HKDF-Expand-Label
    finished_key = hkdf_expand_label(
        secret=client_handshake_secret, 
        label="finished", 
        context=b"", 
        length=48  # SHA-384 output size
    )
    
    # Calculate the transcript hash using SHA-384
    
    # Create HMAC of the transcript hash using finished_key
    h = HMAC(finished_key, hashes.SHA384())
    h.update(transcript_hash)
    return h.finalize()

def create_client_finished_message(client_handshake_key: bytes, client_handshake_iv: bytes, client_handshake_secret: bytes, 
                                   transcript_hash: bytes) -> bytes:
    # Get the verify_data using the function above
    verify_data = calculate_verify_data(client_handshake_secret, transcript_hash)
    
    # Create the Finished message structure
    msg = bytes([0x14, 0x00, 0x00]) + len(verify_data).to_bytes(1, 'big')  # Type and length
    msg += verify_data                      # The HMAC
    msg += bytes([0x16])                    # Handshake message type
    
    # Record layer header as AAD
    print_bytes_as_hex(msg)
    msg_len = len(msg) + 16  # AES-GCM adds a 16-byte authentication tag
    additional = bytes([0x17, 0x03, 0x03]) + msg_len.to_bytes(2, 'big')
    
    # Encrypt using AES-GCM
    aesgcm = AESGCM(client_handshake_key)
    encrypted = aesgcm.encrypt(client_handshake_iv, msg, additional)
    
    # Return the full TLS record
    return additional + encrypted


finished_message = create_client_finished_message(
    client_handshake_key=client_handshake_key,
    client_handshake_iv=client_handshake_iv,
    client_handshake_secret= client_secret,
    transcript_hash = application_context
    )
print_bytes_as_hex(finished_message)


14 00 00 30 92 19 5a 7d cb 8c d1 08 2c ec cd 9d ad 4a e8 3f c0 87 be 89 95 24 d5 ce 9c d1 1d cf c3 fd 0e f1 f8 08 b7 37 6e 12 dd 19 d0 d8 c0 ae 4c fa 43 94 16
17 03 03 00 45 80 4e 49 da aa d5 27 49 1c d1 08 0b 56 0c d3 69 a8 ae 64 e2 71 3c f4 cc e5 7a cb d4 35 95 bf 0f 75 28 dc ab c4 70 33 d5 79 17 0c ab 7a 99 56 13 e3 0e 4b 0d 93 5e 68 18 53 ff 43 11 8c b0 94 aa 9d 63 6c 20 52


In [261]:
s.sendall(finished_message)
print(f"Sent Client Finished message ({len(finished_message)} bytes)")

# Receive response after Finished
response = receive_all_data(s)
print(f"Received response after Finished: {len(response)} bytes")
print_bytes_as_hex(response)





Sent Client Finished message (74 bytes)
Received response after Finished: 24 bytes
17 03 03 00 13 64 0e 89 77 76 24 5b 19 de f8 6c 0d 10 d0 f2 8d d7 84 dc


In [262]:
additional = response[:5]  # 17 03 03 00 13
ciphertext = response[5:]  # 81 b7 90...
sequence_number = 0
nonce = bytes([x ^ y for x, y in zip(
    server_app_iv,
    sequence_number.to_bytes(12, byteorder='big')
)])
aesgcm = AESGCM(server_app_key)
plaintext = aesgcm.decrypt(nonce, ciphertext, additional)

InvalidTag: 

# 11. Decrypting the handshake response:
