#Authenticated DHKE
Based on [RFC 3526]("https://datatracker.ietf.org/doc/html/rfc3526") accepted groups for Internet Key Exchange

In [None]:
from abc import ABC
import hashlib
from collections import deque
import datetime
import dataclasses
import secrets
from typing import Dict, Literal

#
# For the purposes of this demo, this global mailbox is visible to all.
# That's important for Diffie-Hellman, which is trying to metaphorically
# achieve secrecy between two people in a crowded room.
#
MESSAGE_BOXES = {name: deque() for name in ["Alice", "Bob", "Charlize"]}
#
# Byte order, or endianess, needs to agree on all ends. Computer networks use BE
#
NETWORK_BYTE_ORDER = "big"
#
# Multiple algorithms are available, but if the handshake fails, fall back to:
#
FALLBACK_DHPARAM = "Group15"

class DiffieHellmanParameters(ABC):
    def __repr__(self):
        return f"<{self.name} ({self.n_bits}-bit prime)>"

    def get_secret_public(self):
        x = 0
        while x < 2:
            x = secrets.randbelow(self.prime)
        return x, pow(self.generator, x, self.prime)


class RFC3526Group(DiffieHellmanParameters):
    generator = 2  # for reference

    def __init__(self, name: str, prime: int | str, n_bits: int):
        self.name = name
        if isinstance(prime, str):
            prime = int("".join(prime.split()), 16)
        self.prime = prime
        self.n_bits = n_bits


class CustomSpeke(DiffieHellmanParameters):
    def __init__(self, name: str, hashed_password: str, prime: int, n_bits: int):
        self.name = name
        self.prime = prime
        self.n_bits = n_bits

        g = self.generator = pow(
            int(hashed_password, 16),
            2,  # square to avoid the small-subgroup attack
            prime,
        )
        assert g > 1


configs = {
    "Group15": RFC3526Group(
        name="Group15",
        prime="""
          FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
          29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
          EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
          E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
          EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
          C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
          83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
          670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
          E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
          DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
          15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
          ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
          ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
          F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
          BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
          43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF
        """,
        n_bits=3072,
    )
}


def send_message(recipient, message, sender=None):
    if isinstance(message, bytes):
        message = UserMessage(sender=sender, content=message)
    MESSAGE_BOXES[recipient].append(message)


def fetch_messages(recipient):
    box = MESSAGE_BOXES[recipient]
    while len(box) > 0:
        yield box.popleft()


class HandshakeContext:
    def __init__(self, common_name):
        self.common_name = common_name
        parameters = self.parameters = configs.get(common_name)
        self.secret, self.public = parameters.get_secret_public()
        self.shared_key = None

    def __repr__(self):
        return f"<HandshakeContext {self.get_state()}>"

    def get_state(self) -> Literal["paired"] | Literal["waiting"] | None:
        if self.shared_key:
            return "paired"
        elif self.public:
            return "waiting"

    def pair(self, remote_pairing) -> bool:
        if remote_pairing["name"] != self.common_name:
            return False
        shared_secret = pow(
            remote_pairing["public_exponent"], self.secret, self.parameters.prime
        )
        self.shared_key = shared_secret.to_bytes(
            self.parameters.n_bits // 8, NETWORK_BYTE_ORDER
        )
        return True


@dataclasses.dataclass
class UserMessage:
    sender: str
    content: bytes
    timestamp: datetime.datetime = dataclasses.field(
        default_factory=datetime.datetime.now
    )


@dataclasses.dataclass
class ControlMessage:
    sender: str
    verb: str
    payload: Dict
    timestamp: datetime.datetime = dataclasses.field(
        default_factory=datetime.datetime.now
    )


class DiffieHellmanUser:
    def __init__(self, name):
        self.name: str = name
        # An internal lookup of handshake states:
        self._handshakes: Dict[str, HandshakeContext] = {}

    def __repr__(self):
        return f"<DiffieHellmanUser {self.name}>"

    def receive_messages(self):
        messages = list(fetch_messages(self.name))
        for message in messages:
            self._handle_message(message)

    def send_encrypted(self, recipient: str, content: str):
        handshake = self._handshakes[recipient]
        send_message(
            recipient,
            message=bytes(
                a ^ b
                for a, b in zip(
                    handshake.shared_key,
                    # padding is important for security
                    content.encode().ljust(
                        len(handshake.shared_key), b"\x00"
                    ),
                )
            ),
            sender=self.name,
        )

    def _handle_message(self, message: ControlMessage | UserMessage):
        match message:
            case UserMessage():
                handshake = self._handshakes[message.sender]
                content = bytes(
                    a ^ b for a, b in zip(message.content, handshake.shared_key)
                )
                print(
                    message.sender,
                    "->",
                    self.name,
                    message.timestamp,
                    content.rstrip(b"\x00"),  # see padding above
                )
            case ControlMessage():
                if message.verb == "begin":
                    common_name = message.payload["name"]
                    if common_name not in configs:
                        common_name = FALLBACK_DHPARAM
                    self.initiate(
                        message.sender,
                        verb="reply",
                        common_name=common_name,
                    )
                if message.verb in {"begin", "reply"}:
                    handshake = self._handshakes[message.sender]
                    if handshake.pair(remote_pairing=message.payload):
                        print(f"New connection from {message.sender}")
                    else:
                        self.initiate(message.sender, verb="begin", common_name=FALLBACK_DHPARAM)
                else:
                    raise RuntimeError(f"Unexpected {message}")

    def initiate(self, recipient, verb, common_name=FALLBACK_DHPARAM):
        handshake = self._handshakes[recipient] = HandshakeContext(
            common_name=common_name
        )
        send_message(
            recipient,
            message=ControlMessage(
                sender=self.name,
                verb=verb,
                payload={
                    "name": handshake.common_name,
                    "public_exponent": handshake.public,
                },
            ),
        )


In [None]:
if __name__ == "__main__":
  # Use the functions defined in this script, do the following comms:

  # (1) Create a DiffieHellmanUser named Alice

  # (2) Create a DiffieHellmanUser named Bob

  # (3) Use DiffieHellmanUser.initiate function to authenticate Alice and Bob

  # (4) Use the DiffieHellmanUser.send_encrypted function to send a message
  # from Alice to Bob

  # (5) Use the DiffieHellmanUser.send_encrypted function to send a response
  # from Bob to Alice

  # (6) Create a DiffieHellmanUser named Charlize

  # (7) Create a non-standard group selection for the connection between
  # Alice and Charlize by adding a CustomSpeke() object to configs[] that
  # can be used later for messaging
  #
  # Use the following parameters to instantiate the CustomSpeke:
  #  - hashed_password should be a byte array of your choice, processed through
  #    hashlib.sha256 and saved as a hexdigest()
  #  - prime=0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
  #  - n_bits=256, the number of bits in the prime

  # (8) Initiate a handshake between Alice and Charlize

  # (9) Use the DiffieHellmanUser.send_encrypted function to send a message
  # from Alice to Charlize

  # (10) Receive the message as Charlize
