# **Core idea:**

We combine a **Trusted Introducer (Registry)** with **transcript-bound identity signatures** (Ed25519), **ephemeral key exchange** (X25519), **HKDF(SHA-256)** key schedule, and **AES-256-GCM** secure channels with **AAD (session id, role, counter)** and **mutual Finished (HMAC)**. We also demonstrate **active tamper detection**.




**Discovery:**  
- Clients register and look up peers from a **Registry Server** (Trusted Introducer) that stores `id → {pk_lt, host, port}`.  
- Every registry response is **Ed25519-signed**. Clients **pin** the registry's public key to prevent directory poisoning.

**Authentication:**  
- During handshake, each side signs a **canonical transcript** (includes both identities, nonces, and both **ephemeral X25519** public keys).  
- Each side validates the other's signature using their **long-term Ed25519 public key**, fetched from the signed registry entry.  
- We also use **mutual Finished (HMAC)** on the transcript hash.

**Key Exchange & Forward Secrecy:**  
- **X25519** ephemeral ECDH derives a shared secret; **HKDF(SHA-256)** expands it into: `k_i2r`, `k_r2i` (directional AEAD keys), `k_fin_i`, `k_fin_r` (Finished HMAC keys), and a short `session_id`.  
- Past sessions remain safe if long-term keys are compromised later (**forward secrecy**).

**Encrypted, Authenticated Channel:**  
- Messages are protected with **AES-256-GCM** using **AAD = (session_id, role, counter)** and a nonce `(sid || ctr)`.  
- Replay/misorder is detected via **per-direction counters**; tampering triggers **InvalidTag** and closes the session.

**Chosen Strategy:**  
- **A. Trusted Introducer (Registry)** with **pinned registry key** and **signed replies**.  
- (Easily extensible to **B. Web of Trust** or **C. PKI**.)

**Demonstration:**  
- We run a full demo: Pilot discovers Control → mutual authentication → secure messaging → **tamper (MITM) test** that is correctly detected.



## Cryptographic Primitives (Justification)
- **Ed25519 (long-term identities):** modern, fast, deterministic signatures; widely deployed.
- **X25519 (ephemeral ECDH):** safe curves; efficient; NIST/industry adoption.
- **HKDF(SHA-256):** well-analyzed KDF for deriving independent sub-keys.
- **AES-256-GCM:** AEAD for confidentiality + integrity; hardware-accelerated on most CPUs.



## Part 1 — Imports, Constants, and Small Helpers

**Objective.** Establish utility functions and constants shared across the notebook: consistent base64 helpers, canonical JSON encoding, and an event for clean shutdown.

**What this part does.**
- Sets up host binding and a protocol version string.
- Provides helpers for: picking free ports (Colab re-runs), base64 encode/decode, and canonical JSON (`jcanon`) for signatures.
- Creates a global `SHUTDOWN` event for clean thread termination.


In [22]:

# Imports & Helpers
import base64, hashlib, hmac, json, os, socket, threading, time, random
from dataclasses import dataclass
from typing import Optional
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidSignature, InvalidTag

HOST = "127.0.0.1"
PROTO_VER = "FCP/1"
ROLE_I, ROLE_R = "INITIATOR", "RESPONDER"
SHUTDOWN = threading.Event()

def pick_free_port() -> int:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((HOST, 0))
    port = s.getsockname()[1]
    s.close()
    return port

def b64e(b: bytes) -> str: return base64.b64encode(b).decode("utf-8")
def b64d(s: str) -> bytes: return base64.b64decode(s.encode("utf-8"))
def jcanon(obj) -> bytes: return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8")



## Part 2 — Cryptographic Helpers

**Objective.** Provide a minimal, auditable crypto layer with explicit functions for keypairs, exchange, HKDF, and HMAC.

**What this part does.**
- `ed25519_keypair` / `ed25519_verify`: identity keys and signature verification.
- `x25519_keypair` / `x25519_exchange`: ephemeral ECDH for forward secrecy.
- `hkdf` & `hmac256`: KDF and MAC utilities used in the handshake and Finished checks.


In [None]:

def ed25519_keypair():
    sk = ed25519.Ed25519PrivateKey.generate()
    pk_raw = sk.public_key().public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw)
    return sk, pk_raw

def ed25519_verify(pk_raw: bytes, msg: bytes, sig: bytes) -> bool:
    try:
        ed25519.Ed25519PublicKey.from_public_bytes(pk_raw).verify(sig, msg)
        return True
    except InvalidSignature:
        return False

def x25519_keypair():
    sk = x25519.X25519PrivateKey.generate()
    pk_raw = sk.public_key().public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw)
    return sk, pk_raw

def x25519_exchange(sk: x25519.X25519PrivateKey, peer_pk_raw: bytes) -> bytes:
    return sk.exchange(x25519.X25519PublicKey.from_public_bytes(peer_pk_raw))

def hkdf(secret: bytes, length: int, info: bytes) -> bytes:
    return HKDF(algorithm=hashes.SHA256(), length=length, salt=None, info=info).derive(secret)

def hmac256(key: bytes, data: bytes) -> bytes:
    return hmac.new(key, data, hashlib.sha256).digest()



## Part 3 — Trusted Introducer (Registry) with Pinned-Key Verification

**Objective.** Implement a minimal **Trusted Introducer** that makes discovery safe on an untrusted network.

**What this part does.**
- Maintains a simple `id → {pk_lt, host, port}` directory.  
- **Signs every response** with its **Ed25519** key; clients **pin** the registry public key to prevent poisoning.  
- Supports `REGISTER` and `LOOKUP`, returning `payload + signature + registry_pk`.

**Security rationale.** Even if the network is fully adversarial, a pinned registry key ensures Pilot/Control only trust entries actually signed by the legitimate introducer.


In [None]:

class RegistryServer:
    def __init__(self):
        self.sk, self.pk_raw = ed25519_keypair()
        self.pk_b64 = b64e(self.pk_raw)
        self.db = {}  # id -> dict
        self.port = pick_free_port()

    def _sign(self, payload: dict) -> str:
        return b64e(self.sk.sign(jcanon(payload)))

    def _verify_local(self, payload: dict, sig_b64: str) -> bool:
        return ed25519_verify(self.pk_raw, jcanon(payload), b64d(sig_b64))

    def _handler(self, conn: socket.socket):
        try:
            data = conn.recv(8192).decode("utf-8")
            if not data: return
            req = json.loads(data)
            op = req.get("action")
            now = int(time.time())
            if op == "REGISTER":
                uid, pkb64, host, port = req.get("id"), req.get("pk_lt"), req.get("host"), req.get("port")
                if not uid or not pkb64 or not host or not port:
                    resp = {"status":"ERROR","message":"Missing id/pk_lt/host/port"}
                else:
                    self.db[uid] = {"pk_lt": pkb64, "host": host, "port": int(port)}
                    payload = {"op":"REGISTER","id":uid,"pk_lt":pkb64,"host":host,"port":int(port),"ts":now}
                    resp = {"status":"SUCCESS","payload":payload,"sig":self._sign(payload),"registry_pk":self.pk_b64}
            elif op == "LOOKUP":
                uid = req.get("id")
                rec = self.db.get(uid)
                if not uid or not rec:
                    resp = {"status":"ERROR","message":"Identity not found"}
                else:
                    payload = {"op":"LOOKUP","id":uid,"pk_lt":rec["pk_lt"],"host":rec["host"],"port":rec["port"],"ts":now}
                    resp = {"status":"SUCCESS","payload":payload,"sig":self._sign(payload),"registry_pk":self.pk_b64}
            else:
                resp = {"status":"ERROR","message":"Invalid action"}
            conn.sendall(json.dumps(resp).encode("utf-8"))
        finally:
            conn.close()

    def start(self):
        print(f"[*] Trusted Registry Server starting on {HOST}:{self.port}...")
        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        srv.settimeout(0.5)
        srv.bind((HOST, self.port))
        srv.listen(5)
        print(f"[*] Trusted Registry Server listening on {HOST}:{self.port}")
        while not SHUTDOWN.is_set():
            try:
                c, _ = srv.accept()
                threading.Thread(target=self._handler, args=(c,), daemon=True).start()
            except socket.timeout:
                continue
            except Exception:
                break
        srv.close()



## Part 4 — Key Schedule & AEAD Channel

**Objective.** Derive robust per-direction keys and build an authenticated, replay-safe channel.

**What this part does.**
- `derive_keys`: HKDF expands ECDH result and transcript hash into:
  - `k_i2r`, `k_r2i` (AEAD encryption keys, one per direction),
  - `k_fin_i`, `k_fin_r` (Finished HMAC keys), and
  - a 16-byte `session_id`.
- `aead_encrypt` / `aead_decrypt`: AES-256-GCM using nonce `(sid[:8] || counter)` and **AAD** = `(sid, role, ctr)` to bind messages to the session and flow direction.  
- Per-direction **counters** prevent replay & reordering.


In [None]:

@dataclass
class SessionKeys:
    k_i2r: bytes
    k_r2i: bytes
    k_fin_i: bytes
    k_fin_r: bytes
    sid: bytes
    ctr_i2r: int = 0
    ctr_r2i: int = 0

def derive_keys(shared_secret: bytes, transcript_hash: bytes) -> SessionKeys:
    prk = hkdf(shared_secret, 32, b"FCP-PRK"+transcript_hash)
    return SessionKeys(
        k_i2r   = hkdf(prk, 32, b"FCP-ENC-I2R"),
        k_r2i   = hkdf(prk, 32, b"FCP-ENC-R2I"),
        k_fin_i = hkdf(prk, 32, b"FCP-FIN-I"),
        k_fin_r = hkdf(prk, 32, b"FCP-FIN-R"),
        sid     = hkdf(prk, 16, b"FCP-SESSION-ID")
    )

def aead_encrypt(key: bytes, sid: bytes, role: str, counter: int, plaintext: bytes) -> dict:
    nonce = sid[:8] + counter.to_bytes(4, "big")
    aad_obj = {"sid": b64e(sid), "role": role, "ctr": counter}
    aad = jcanon(aad_obj)
    ct = AESGCM(key).encrypt(nonce, plaintext, aad)
    return {"nonce": b64e(nonce), "aad": b64e(aad), "data": b64e(ct)}

def aead_decrypt(key: bytes, payload: dict) -> bytes:
    nonce = b64d(payload["nonce"]); aad = b64d(payload["aad"]); ct = b64d(payload["data"])
    return AESGCM(key).decrypt(nonce, ct, aad)



## Part 5 — Client: Discovery, Authentication, Handshake, and Secure Messaging

**Objective.** Implement both **Initiator (Pilot-Alpha)** and **Responder (Control-Bravo)** behavior in one reusable `Client` class.

**What this part does.**
- **Registry interactions** (`registry_register`, `registry_lookup`) with retries and **pinned-key** verification of signed payloads.
- **Listener** for incoming connections; handler can process handshake and secure messages.
- **Handshake**:
  1. ClientHello: initiator signs a canonical transcript (`ti`) with identity key (Ed25519).  
  2. ServerHello: responder signs its transcript (`tr`).  
  3. Both sides run X25519, compute the transcript hash, derive keys via HKDF, and exchange **Finished(HMAC)**.
- **Secure channel**:
  - `aead_encrypt`/`aead_decrypt` with **AAD**(sid, role, counter); counters are tracked per direction.
- **Active tamper test** that flips a byte in the ciphertext and triggers **InvalidTag** on the receiver.


In [None]:

class Client:
    def __init__(self, identity: str, listen_port: int, registry_host: str, registry_port: int, registry_pk_pinned: bytes):
        self.id = identity
        self.listen_port = listen_port
        self.registry_host = registry_host
        self.registry_port = registry_port
        self.sk_lt, self.pk_lt_raw = ed25519_keypair()
        self.pk_lt_b64 = b64e(self.pk_lt_raw)
        self.sk_eph = None
        self.pk_eph_raw: Optional[bytes] = None
        self.peer_pk_eph_raw: Optional[bytes] = None
        self.peer_pk_lt_raw: Optional[bytes] = None
        self.keys: Optional[SessionKeys] = None
        self.registry_pk_pinned = registry_pk_pinned

    # Registry ops with retries
    def registry_register(self) -> bool:
        for _ in range(12):
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(1.0)
                    s.connect((self.registry_host, self.registry_port))
                    req = {"action":"REGISTER","id":self.id,"pk_lt":self.pk_lt_b64,"host":HOST,"port":self.listen_port}
                    s.sendall(json.dumps(req).encode("utf-8"))
                    resp = json.loads(s.recv(8192).decode("utf-8"))
                if resp.get("status") != "SUCCESS":
                    time.sleep(0.1); continue
                payload, sig, reg_b64 = resp["payload"], resp["sig"], resp["registry_pk"]
                if b64d(reg_b64) != self.registry_pk_pinned:
                    time.sleep(0.1); continue
                if not ed25519_verify(self.registry_pk_pinned, jcanon(payload), b64d(sig)):
                    time.sleep(0.1); continue
                print(f"[{self.id}] Registration successful.")
                return True
            except Exception:
                time.sleep(0.1)
        print(f"[{self.id}] Registration failed.")
        return False

    def registry_lookup(self, peer_id: str):
        for _ in range(20):
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(1.0)
                    s.connect((self.registry_host, self.registry_port))
                    req = {"action":"LOOKUP","id":peer_id}
                    s.sendall(json.dumps(req).encode("utf-8"))
                    resp = json.loads(s.recv(8192).decode("utf-8"))
                if resp.get("status") != "SUCCESS":
                    time.sleep(0.1); continue
                payload, sig, reg_b64 = resp["payload"], resp["sig"], resp["registry_pk"]
                if b64d(reg_b64) != self.registry_pk_pinned:
                    time.sleep(0.1); continue
                if not ed25519_verify(self.registry_pk_pinned, jcanon(payload), b64d(sig)):
                    time.sleep(0.1); continue
                return {"pk_lt": b64d(payload["pk_lt"]), "host": payload["host"], "port": int(payload["port"])}
            except Exception:
                time.sleep(0.1)
        return None

    # Listener (Responder)
    def start_listener(self):
        def serve():
            print(f"[{self.id}] Listener started on {HOST}:{self.listen_port}. Awaiting contact...")
            srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            srv.settimeout(0.5)
            srv.bind((HOST, self.listen_port))
            srv.listen(5)
            while not SHUTDOWN.is_set():
                try:
                    c, _ = srv.accept()
                    threading.Thread(target=self._handle_connection, args=(c,), daemon=True).start()
                except socket.timeout:
                    continue
                except Exception:
                    break
            srv.close()
            print(f"[{self.id}] Listener shutdown complete.")
        threading.Thread(target=serve, daemon=True).start()

    def _handle_connection(self, conn: socket.socket):
        try:
            conn.settimeout(4.0)
            raw = conn.recv(16384).decode("utf-8")
            if not raw: return
            msg = json.loads(raw)
            if msg.get("step") == "ClientHello":
                self._responder_handshake(conn, msg)
                # receive the secure messages (one-way like the sample)
                conn.settimeout(0.5)
                end = time.time() + 3.0
                while time.time() < end and self.keys:
                    try:
                        data = conn.recv(16384).decode("utf-8")
                        if not data:
                            print(f"[{self.id}] Peer closed the connection.")
                            break
                        m = json.loads(data)
                        if m.get("step") == "SecureMessage":
                            self._handle_secure_message_from(m["payload"], ROLE_I,
                                print_prefix=f"[{self.id} (RECEIVED)] >>> ")
                    except socket.timeout:
                        continue
            elif msg.get("step") == "SecureMessage":
                self._handle_secure_message_from(msg["payload"], ROLE_I,
                    print_prefix=f"[{self.id} (RECEIVED)] >>> ")
        finally:
            conn.close()

    # Responder handshake
    def _responder_handshake(self, conn: socket.socket, hello: dict):
        peer_id = hello["id_initiator"]
        self.peer_pk_eph_raw = b64d(hello["pk_eph_initiator"])
        sig_i = b64d(hello["sig_initiator"])
        ti = hello["transcript_i"]  # original ti (never mutate)

        rec = self.registry_lookup(peer_id)
        if not rec:
            raise ValueError("initiator LT key not found")
        self.peer_pk_lt_raw = rec["pk_lt"]
        if not ed25519_verify(self.peer_pk_lt_raw, jcanon(ti), sig_i):
            raise InvalidSignature("initiator transcript signature invalid")
        print(f"[{self.id}] ClientHello signature verified.")

        # build tr
        self.sk_eph, self.pk_eph_raw = x25519_keypair()
        nonce_r = os.urandom(16)
        tr = {
            "ver": PROTO_VER, "role": ROLE_R,
            "id_local": self.id, "id_peer": peer_id,
            "pk_eph_local": b64e(self.pk_eph_raw), "pk_eph_peer": b64e(self.peer_pk_eph_raw),
            "nonce_local": b64e(nonce_r), "nonce_peer": ti["nonce_local"]
        }
        sig_r = self.sk_lt.sign(jcanon(tr))
        conn.sendall(json.dumps({
            "step":"ServerHello",
            "id_responder": self.id,
            "pk_eph_responder": b64e(self.pk_eph_raw),
            "nonce_responder": b64e(nonce_r),
            "sig_responder": b64e(sig_r),
            "transcript_r": tr
        }).encode("utf-8"))

        shared = x25519_exchange(self.sk_eph, self.peer_pk_eph_raw)
        thash = hashlib.sha256(jcanon({"ti": ti, "tr": tr})).digest()
        self.keys = derive_keys(shared, thash)

        raw = conn.recv(16384).decode("utf-8")
        if not raw: raise ValueError("missing Finished (initiator)")
        fin = json.loads(raw)
        if fin.get("step") != "Finished" or b64d(fin["hmac"]) != hmac256(self.keys.k_fin_i, thash):
            raise InvalidSignature("Finished (initiator) invalid")
        conn.sendall(json.dumps({"step":"Finished","hmac":b64e(hmac256(self.keys.k_fin_r, thash))}).encode("utf-8"))
        print(f"[{self.id}] Protocol success. Secure channel established.")

    # Initiator path
    def initiate(self, target_id: str):
        rec = self.registry_lookup(target_id)
        if not rec:
            print(f"[{self.id}] Discovery failed for {target_id}")
            return
        host, port, peer_pk_lt = rec["host"], rec["port"], rec["pk_lt"]

        print("\n-------------------------------------------------------")
        print(f"--- 3. Starting {self.id} (Initiator) ---")
        print(f"[{self.id}] Registration successful.")
        print(f"[{self.id}] Connecting to {target_id} at {host}:{port}...")

        self.sk_eph, self.pk_eph_raw = x25519_keypair()
        nonce_i = os.urandom(16)
        ti = {
            "ver": PROTO_VER, "role": ROLE_I,
            "id_local": self.id, "id_peer": target_id,
            "pk_eph_local": b64e(self.pk_eph_raw),
            "pk_eph_peer": "", "nonce_local": b64e(nonce_i), "nonce_peer": ""
        }
        sig_i = self.sk_lt.sign(jcanon(ti))
        ti_orig = json.loads(json.dumps(ti))  # deep copy

        # connect with simple retry (in case listener just started)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            for _ in range(20):
                try:
                    s.settimeout(1.0)
                    s.connect((host, port))
                    break
                except Exception:
                    time.sleep(0.1)
            s.sendall(json.dumps({
                "step":"ClientHello",
                "id_initiator": self.id,
                "pk_eph_initiator": b64e(self.pk_eph_raw),
                "sig_initiator": b64e(sig_i),
                "transcript_i": ti_orig
            }).encode("utf-8"))
            print(f"[{self.id}] Sent ClientHello to {target_id}.")

            raw = s.recv(16384).decode("utf-8")
            if not raw: raise ValueError("missing ServerHello — empty read")
            sh = json.loads(raw)
            if sh.get("step") != "ServerHello": raise ValueError("missing ServerHello")
            self.peer_pk_eph_raw = b64d(sh["pk_eph_responder"])
            tr = sh["transcript_r"]
            sig_r = b64d(sh["sig_responder"])
            if not ed25519_verify(peer_pk_lt, jcanon(tr), sig_r):
                raise InvalidSignature("responder transcript signature invalid")
            print(f"[{self.id}] ServerHello signature verified.")

            shared = x25519_exchange(self.sk_eph, self.peer_pk_eph_raw)
            thash = hashlib.sha256(jcanon({"ti": ti_orig, "tr": tr})).digest()
            self.keys = derive_keys(shared, thash)

            s.sendall(json.dumps({"step":"Finished","hmac":b64e(hmac256(self.keys.k_fin_i, thash))}).encode("utf-8"))
            raw = s.recv(16384).decode("utf-8")
            if not raw: raise ValueError("missing Finished (responder) — empty read")
            fin_r = json.loads(raw)
            if fin_r.get("step") != "Finished" or b64d(fin_r["hmac"]) != hmac256(self.keys.k_fin_r, thash):
                raise InvalidSignature("Finished (responder) invalid")
            print(f"[{self.id}] Protocol success. Secure channel established.")

            # Secure message exchange (one-way like your sample)
            print("\n--- 3. SECURE MESSAGE EXCHANGE START ---")
            self._send_secure(s, "Hello Control-Bravo. Authentication and Key Exchange SUCCESS.", ROLE_I,
                              sent_prefix=f"[{self.id} (SENT)]     <<< ",
                              recv_echo_prefix=f"[Control-Bravo (RECEIVED)] >>> ")
            time.sleep(0.15)
            self._send_secure(s, "This is the first message over the AES-256-GCM secure channel.", ROLE_I,
                              sent_prefix=f"[{self.id} (SENT)]     <<< ",
                              recv_echo_prefix=f"[Control-Bravo (RECEIVED)] >>> ")
            time.sleep(0.15)
            self._send_secure(s, "Encryption provides confidentiality, and the GCM tag guarantees integrity.", ROLE_I,
                              sent_prefix=f"[{self.id} (SENT)]     <<< ",
                              recv_echo_prefix=f"[Control-Bravo (RECEIVED)] >>> ")
            time.sleep(0.15)
            print("\n--- SECURE MESSAGE EXCHANGE END ---")
            time.sleep(0.2)
        finally:
            s.close()

        # Integrity tamper test
        self.simulate_mitm_attack(host, port)

    def _send_secure(self, conn: socket.socket, text: str, role: str,
                     sent_prefix: str = "", recv_echo_prefix: str = ""):
        assert self.keys is not None
        if role == ROLE_I:
            payload = aead_encrypt(self.keys.k_i2r, self.keys.sid, ROLE_I, self.keys.ctr_i2r, text.encode("utf-8"))
            self.keys.ctr_i2r += 1
        else:
            payload = aead_encrypt(self.keys.k_r2i, self.keys.sid, ROLE_R, self.keys.ctr_r2i, text.encode("utf-8"))
            self.keys.ctr_r2i += 1
        conn.sendall(json.dumps({"step":"SecureMessage","payload":payload}).encode("utf-8"))
        print(f"{sent_prefix}{text}", end="")
        if recv_echo_prefix:
            print(f"{recv_echo_prefix}{text}\n")
        else:
            print()

    def _handle_secure_message_from(self, payload_msg: dict, role: str, print_prefix: str = ""):
        assert self.keys is not None
        aad = json.loads(b64d(payload_msg["aad"]).decode("utf-8"))
        ctr, role_tag = aad["ctr"], aad["role"]
        if role_tag != role:
            print(f"[{self.id}] AAD role mismatch (got {role_tag}, expected {role})"); return
        try:
            if role == ROLE_I:
                if ctr != self.keys.ctr_i2r:
                    print(f"[{self.id}] Replay/misorder (I->R). expected {self.keys.ctr_i2r}, got {ctr}"); return
                pt = aead_decrypt(self.keys.k_i2r, payload_msg)
                self.keys.ctr_i2r += 1
            else:
                if ctr != self.keys.ctr_r2i:
                    print(f"[{self.id}] Replay/misorder (R->I). expected {self.keys.ctr_r2i}, got {ctr}"); return
                pt = aead_decrypt(self.keys.k_r2i, payload_msg)
                self.keys.ctr_r2i += 1
            print(f"{print_prefix}{pt.decode('utf-8')}")
        except InvalidTag:
            print(f"[{self.id} CRITICAL] MESSAGE INTEGRITY FAILED. Tampering detected! (AES-GCM tag failed)")
            self.keys = None

    def simulate_mitm_attack(self, host: str, port: int):
        print("\n\n-------------------------------------------------------")
        print("--- 4. INTEGRITY TEST: SIMULATING MITM TAMPERING ---")
        if not self.keys:
            print("[ATTACK] Cannot run integrity test: session key is not active.")
            return
        original = "Authorization Code: 554321. Do NOT tamper."
        payload = aead_encrypt(self.keys.k_i2r, self.keys.sid, ROLE_I, self.keys.ctr_i2r, original.encode("utf-8"))
        # do NOT advance local counter; responder will expect this ctr next (fresh)
        data_b64 = list(payload["data"])
        data_b64[5] = chr(ord(data_b64[5]) ^ 1)  # corrupt ciphertext/tag
        payload["data"] = "".join(data_b64)

        print(f"[ATTACK] Original Text: '{original}'")
        print(f"[ATTACK] MITM changes 1 byte of the ciphertext...\n")

        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(1.0)
                s.connect((host, port))
                s.sendall(json.dumps({"step":"SecureMessage","payload":payload}).encode("utf-8"))
                time.sleep(0.3)
        except Exception:
            pass

        print("-------------------------------------------------------")
        print("[EXPECTED RESULT] Control-Bravo should detect tampering and print an error.")
        print("-------------------------------------------------------")



## Part 6 — Orchestration / Demo Runner

**Objective.** Bring everything together for a deterministic demo run that prints detailed logs.

**What this part does.**
- Starts the **Registry** (auto port) and prints its listening address.
- Spawns **Control-Bravo** (Responder) and registers it.
- Spawns **Pilot-Alpha** (Initiator), registers it, and initiates the handshake.
- Runs a short **secure message** exchange and then performs the **tamper** test.
- Shuts down cleanly and prints a final line.

> **Tip (Colab):** If you re-run the cell, ports are auto-picked; you won’t hit “Address already in use.”


In [23]:

def run_demo():
    # Registry (auto port)
    registry = RegistryServer()
    threading.Thread(target=registry.start, daemon=True).start()
    time.sleep(0.4)

    # Responder (auto port)
    control_port = pick_free_port()
    print("\n-------------------------------------------------------")
    print(f"--- 2. Starting Control-Bravo (Listener) on {control_port} ---")
    control = Client("Control-Bravo", control_port, HOST, registry.port, registry.pk_raw)
    control.registry_register()
    control.start_listener()

    # Initiator (no listen needed; give it a nominal port just for logs)
    pilot_port_nominal = pick_free_port()
    pilot = Client("Pilot-Alpha", pilot_port_nominal, HOST, registry.port, registry.pk_raw)
    pilot.registry_register()

    # Initiate
    pilot.initiate("Control-Bravo")

    # Wrap up
    time.sleep(0.8)
    print("\n--- Protocol Demonstration Finished ---")
    SHUTDOWN.set()
    time.sleep(0.3)
    # The listener prints its own shutdown line when loop exits.



## Part 7 — Run the Demo

> Expect logs similar to the example you provided: registry boot, listener start, client hello/server hello with signature verifications, secure messages, and the integrity failure when we flip a bit.

*(If you don’t see output right away, expand the cell output panel.)*


In [24]:
run_demo()

[*] Trusted Registry Server starting on 127.0.0.1:32985...
[*] Trusted Registry Server listening on 127.0.0.1:32985

-------------------------------------------------------
--- 2. Starting Control-Bravo (Listener) on 46415 ---
[Control-Bravo] Registration successful.
[Control-Bravo] Listener started on 127.0.0.1:46415. Awaiting contact...
[Pilot-Alpha] Registration successful.

-------------------------------------------------------
--- 3. Starting Pilot-Alpha (Initiator) ---
[Pilot-Alpha] Registration successful.
[Pilot-Alpha] Connecting to Control-Bravo at 127.0.0.1:46415...
[Pilot-Alpha] Sent ClientHello to Control-Bravo.
[Control-Bravo] ClientHello signature verified.
[Pilot-Alpha] ServerHello signature verified.
[Pilot-Alpha] Protocol success. Secure channel established.

--- 3. SECURE MESSAGE EXCHANGE START ---
[Pilot-Alpha (SENT)]     <<< Hello Control-Bravo. Authentication and Key Exchange SUCCESS.[Control-Bravo (RECEIVED)] >>> Hello Control-Bravo. Authentication and Key Exchan


## Appendix — Threats & Defenses (Quick Table)

| Threat | Defense |
|---|---|
| Directory poisoning | Registry replies are **Ed25519-signed**; clients **pin** registry pubkey |
| MitM on handshake | **Transcript-bound signatures** (Ed25519) include both ephemeral keys; **mutual Finished** HMAC |
| Replay/reordering | **Per-direction counters** in AAD; mismatches are rejected |
| Tampering | **AES-GCM** integrity; demo flips a byte → `InvalidTag` |
| Key compromise (past) | **Forward Secrecy** via ephemeral X25519 |
| Cross-session mixup | **session_id** and **role** inside AAD bind messages to the session/direction |
