In [1]:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import x448
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.asymmetric import ed448
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

import os
import asyncio

async def send_signed(key: ed448.Ed448PrivateKey, writer: asyncio.StreamWriter, data: bytes) -> None:
    sig = key.sign(data)
    writer.write(sig)
    writer.write(data)
    await writer.drain()

async def read_signed(auth_key: ed448.Ed448PublicKey, reader: asyncio.StreamReader, n: int) -> bytes:
    sig = await reader.read(114)
    data = await reader.read(n)

    auth_key.verify(sig, data)

    return data
    

In [2]:
async def client(s_ip: str, s_port: int, message: bytes):
    private_key_x448 = x448.X448PrivateKey.generate()
    public_key_x448 = private_key_x448.public_key()

    # Ed448 Signing&Verification
    private_key_ed448 = ed448.Ed448PrivateKey.generate()
    public_key_ed448 = private_key_ed448.public_key()

    print('client: Openning connection')
    reader, writer = await asyncio.open_connection(s_ip, s_port)

    # ed448 Exchange
    print('client: sending public auth key')
    await send_signed(private_key_ed448, writer, public_key_ed448.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw))

    print('client: reading and verifying server auth key')
    other_auth_sig = await reader.read(114)
    other_authkey_bytes = await reader.read(57)

    other_authkey = ed448.Ed448PublicKey.from_public_bytes(other_authkey_bytes)

    other_authkey.verify(other_auth_sig, other_authkey_bytes)

    # x448 Exchange    
    print('client: sending public x448 key')
    await send_signed(private_key_ed448, writer, public_key_x448.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw))

    print('client: reading server public x448 key')
    other_pkey_bytes = await read_signed(other_authkey, reader, 56)

    other_pkey = x448.X448PublicKey.from_public_bytes(other_pkey_bytes)

    shared_key = private_key_x448.exchange(other_pkey)

    # Perform key derivation.
    key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=b'handshake data',
    ).derive(shared_key)

    print('client: key:', key)

    nonce = os.urandom(16)
    algorithm = algorithms.ChaCha20(key, nonce)
    cipher = Cipher(algorithm, mode=None)
    encryptor = cipher.encryptor()
    ct = encryptor.update(message)

    print('client: sending message')
    await send_signed(private_key_ed448, writer, nonce+ct)

    writer.close()

    await writer.wait_closed()

In [3]:
class Server:

    def __init__(self, ip: str, port: int):

        self.ip = ip
        self.port = port

        # X448 key exchange
        self.private_key_x448 = x448.X448PrivateKey.generate()
        self.public_key_x448 = self.private_key_x448.public_key()

        # Ed448 Signing&Verification
        self.private_key_ed448 = ed448.Ed448PrivateKey.generate()
        self.public_key_ed448 = self.private_key_ed448.public_key()

        self.server = asyncio.Server

    async def handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        # ed448 Exchange
        await send_signed(self.private_key_ed448, writer, self.public_key_ed448.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw))

        other_auth_sig = await reader.read(114)
        other_authkey_bytes = await reader.read(57)

        other_authkey = ed448.Ed448PublicKey.from_public_bytes(other_authkey_bytes)

        other_authkey.verify(other_auth_sig, other_authkey_bytes)

        # x448 Exchange    
        await send_signed(self.private_key_ed448, writer, self.public_key_x448.public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw))

        other_pkey_bytes = await read_signed(other_authkey, reader, 56)

        other_pkey = x448.X448PublicKey.from_public_bytes(other_pkey_bytes)

        shared_key = self.private_key_x448.exchange(other_pkey)

        # Perform key derivation.
        key = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=None,
            info=b'handshake data',
        ).derive(shared_key)

        # Read message
        ct_data = await read_signed(other_authkey, reader, -1)
        nonce = ct_data[0:16]
        ct_message = ct_data[16:]

        algorithm = algorithms.ChaCha20(key, nonce)
        cipher = Cipher(algorithm, mode=None)
        decryptor = cipher.decryptor()

        plaintext = decryptor.update(ct_message)
        plaintext = plaintext.decode('utf-8')

        print("received: ", plaintext)

        writer.close()
        await writer.wait_closed()
        self.server.close()

    async def start_server(self):
        self.server = await asyncio.start_server(self.handle_connection, self.ip, self.port)
        print('server: started')
        async with self.server:
            await self.server.serve_forever()

In [4]:
server = Server('127.0.0.1', 9876)
server_task = asyncio.get_running_loop().create_task(server.start_server())

server: started
received:  yep


In [5]:
#client_task = asyncio.get_running_loop().create_task(client('127.0.0.1', 9876, b'yep'))
await client('127.0.0.1', 9876, b'yep')

client: Openning connection
client: sending public auth key
client: reading and verifying server auth key
client: sending public x448 key
client: reading server public x448 key
client: key:  b'\xbfF#\xc8\xfe<\x10\xb4\xc5\x9d\x1cv\x8d\xb1*BDO\xb0/\x11\xed"A\xf6\xde\xa4\xa3{\xc4+3'
client: sending message


In [29]:
client_task.cancel()
server_task.cancel()

True