### Lab 7: Cifrado de extremo a extremo (E2EE)

#### Integrantes:
##### Sergio Ehlen
##### Gabriela González 
##### Cristóbal Moraga

#### TEL252 - Criptografía y Seguridad en la Información
##### 2do Semestre, 2025
##### Docente: Daniel Espinoza
##### Fecha de entrega: 27 de Noviembre, 08:00 horas

_Nota_: El uso de herramientas de Inteligencia Artificial (IA) está permitido únicamente si se justifica adecuadamente y se evidencia comprensión del contenido. En caso de detectarse el uso de IA sin justificación, o la inclusión de conceptos no abordados en la asignatura sin una explicación clara, el laboratorio será evaluado con nota mínima.

##### **Actividad 1** (100%) 

En este laboratorio usted y su grupo tendrá que desarrollar una API básica con Flask y Python. Para ello, puede apoyarse en las APIs presentadas en los laboratorios 2 y 5, donde se tiene un script `server.py` que define las rutas y corre el servidor, además de un archivo `crypto.py` que describe las funcionalidades criptográficas. No es necesario crear un docker, y puede modificar la estructura a voluntad.

Lo que se busca en el último laboratorio de la asignatura es que ustedes puedan **integrar** primitivas criptográficas para crear una API básica funcional que implemente cifrado de extremo a extremo (E2EE), es decir, que el cliente y el servidor puedan comunicarse de forma segura. Para distinguir entre grupos, cada uno tiene que elegir un contexto entre los siguientes:

1. Redes de Sensores (IoT)
2. Chat (_WhatsApp_ por ejemplo)
3. Gestor de contraseñas
4. Pasarela de pago
5. Voto electrónico

Puede proponer otros temas si prefiere. El **registro de los grupos** debe realizarse a más tardar el **Jueves 23 de Octubre, a las 23:59 horas**.




En este laboratorio no se les va a indicar cómo realizar la API. Cada grupo debe tomar sus propias **decisiones de diseño que deben tener sentido con el contexto elegido**. Se vuelve a recalcar que la API puede ser simple, por lo que aspectos como bases de datos, configuraciones, etc, se pueden simplificar a conveniencia, lo importante es que en este Lab aprendan a **integrar**. Para esta evaluación, el rol del profesor será el de una guía.

La entrega del Lab 7 se divide en dos partes:

1. API (50%): Corresponde a los archivos necesarios para que la API sea funcional. Recordar que **debe** ser una API en Python con Flask.
2. Diagrama (50%): Cada grupo debe realizar un diagrama en formato libre (puede ser UML, _Flowchart_, _Timing_, etc) que debe describir cómo funciona la comunicación con la API. En el diagrama **se deben especificar los algoritmos utilizados para cada proceso, sus parámetros y la matemática involucrada en ellos**. Dicho de otra manera, el diagrama debe ser autocontenido, es decir, mediante el diagrama debe ser suficiente para entender la API al completo.

Durante el Jueves 27 de Noviembre, todos los grupos deberán llevar impreso su diagrama para pegarlo en la pizarra de la sala. Los dos bloques serán utilizados para que los grupos puedan analizar los diagramas de sus otros compañeros y pegar críticas constructivas y sugerencias a través de adhesivos _Post-it_. **La exposición es el requisito para que su diagrama sea corregido por el profesor**.

# Problema seleccionado: CHAT ENCRIPTADO E2E

## Introducción

### Objetivo del Sistema

Implementar un servicio de mensajería cifrada de extremo a extremo donde:

- **El servidor NO puede leer los mensajes** (solo actúa como relay de paquetes cifrados)
- **La autenticación es robusta** con factor dual (password + TOTP)
- **Todas las primitivas son aprobadas por el currículo** de TEL252
- **La integridad y autenticidad** están garantizadas mediante MACs y AEAD


### Primitivas Criptográficas Utilizadas

| Primitiva | Uso en el Sistema | Clase TEL252 | RFC/Estándar |
|-----------|-------------------|--------------|--------------|
| **HMAC-SHA256** | Autenticación de contraseñas | Clase 11 (MACs) | RFC 2104 |
| **TOTP (RFC 6238)** | Segundo factor de autenticación | Clase 11 (aplicación práctica) | RFC 6238 |
| **RSA-2048** | Generación de pares de llaves por dispositivo | Clase 4 (RSA) | PKCS#1 v2.2 |
| **RSA-OAEP** | Key wrapping (envolvimiento de llaves de sesión AES) | Clase 8 (RSA-KEM) | RFC 8017 |
| **AES-256-GCM** | Cifrado autenticado de mensajes (AEAD) | Clase 3 (AES) + Clase 11 (AEAD) | NIST SP 800-38D |
| **HMAC-SHA256 (tokens)** | Firma de tokens de sesión (similar a JWT) | Clase 11 (MACs) | RFC 2104 |

## Ejemplo de ejecución: Comunicación de pares

Los bloques de código a continuación demuestran el funcionamiento de un caso de uso simple del sistema sin la interfaz gráfica. Cabe notar que el servidor ocupa una base de datos relacional mysql, por lo que el código ejecutado a continuación tiene persistencia. Para evitar registros repetidos al volver a ejecutar las celdas, se utilizan numeros aleatorios en las credenciales de los usuarios de ejemplo.

Para ejecutar los siguientes bloques de código, levantar primero el servidor con 'python iniciar_servidor.py' sin el flag --tls. Cabe notar que este es un modo de operación demostrativo, para poder realizar las query de una forma simple y visualizable similar a laboratorios anteriores, la cual demuestra la capacidad de comunicación segura fin a fin entre usuarios, pero que no asegura la comunicación segura entre usuario y servidor. Este seguridad es proveida usando TLS, por lo que se debe activar este flag y usar el cliente web para tener el stack criptográfico completo.

In [188]:
#Imports
import requests
import crypto
import base64
import json
from random import randint
from typing import Any, Dict, Optional
from datetime import datetime

#Servidor
HOST = "http://127.0.0.1:5000"

#Las siguientes funciones se usan para hacer los query post,get y delete integrando los token de manera simple
def api_post(path: str, payload: Dict[str, Any], token: Optional[str] = None) -> Dict[str, Any]:
    headers = {"Content-Type": "application/json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"
    response = requests.post(f"{HOST}{path}", headers=headers, data=json.dumps(payload), timeout=10)
    if response.status_code >= 400:
        try:
            error = response.json().get("error")
        except ValueError:
            error = response.text
        raise SystemExit(f"API error {response.status_code}: {error}")
    return response.json()
def api_get(path: str, token: Optional[str] = None) -> Dict[str, Any]:
    headers = {}
    if token:
        headers["Authorization"] = f"Bearer {token}"
    response = requests.get(f"{HOST}{path}", headers=headers, timeout=10)
    if response.status_code >= 400:
        try:
            error = response.json().get("error")
        except ValueError:
            error = response.text
        raise SystemExit(f"API error {response.status_code}: {error}")
    return response.json()
def api_delete(path: str, token: str) -> None:
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.delete(f"{HOST}{path}", headers=headers, timeout=10)
    if response.status_code >= 400:
        try:
            error = response.json().get("error")
        except ValueError:
            error = response.text
        raise SystemExit(f"API error {response.status_code}: {error}")

### Registro de usuario

In [177]:
#Crear credenciales de PabloXXX
rand = randint(100,999)

correo = f"Pablo.{rand}@usm.cl"
uname = f"Pablo{rand}"
pword = f"pword{rand}"
print(f"Registrando a {uname} con correo {uname} y password {pword}")

#Registrar a PabloXXX como un nuevo usuario
res = requests.post(f"{HOST}/api/register",json={
    "identifier":correo,
    "display_name":uname,
    "password":pword,
})
print(res.json())
totp_secret = res.json()["totp_secret"]
print(f"TOTP Secret: {totp_secret}")

Registrando a Pablo577 con correo Pablo577 y password pword577
{'display_name': 'Pablo577', 'id': 41, 'identifier': 'Pablo.577@usm.cl', 'totp_secret': 'SFYZSCBNF4JDWIWJXCPRIYQJ5NE56HXW'}
TOTP Secret: SFYZSCBNF4JDWIWJXCPRIYQJ5NE56HXW


### Login / generacion de TOTP

In [178]:
#Iniciar sesion para PabloXXX
res = requests.post(f"{HOST}/api/login",json={
    "identifier":correo,
    "password":pword,
    "totp_code":crypto.generate_totp(totp_secret),
})
print(res.json())
token = res.json()["token"]

{'token': 'VEVMMjUyLUhNQUM=.eyJleHAiOjE3NjQxNzI3NjQsImlhdCI6MTc2NDE2OTE2NCwidWlkIjo0MX0=.xnhiwnbZNFetIWv2KMSjVy-F-THXDjhWlu81xM6Opmg=', 'user': {'display_name': 'Pablo577', 'id': 41, 'identifier': 'Pablo.577@usm.cl'}}


### Registro de dispositivo / llave pública

In [179]:
#Generar par de llaves asimetricas RSA
private_key_pem, public_key_pem = crypto.generate_rsa_keypair()

#Registrar dispositivo
res = api_post("/api/devices",payload={
    "device_name":f"{uname}-laptop",
    "public_key_pem":public_key_pem,
    },token=token)
res


{'device_id': 20}

### Repetir proceso para segundo usuario

In [180]:
#Regitrar a JuanXXX
rand2 = randint(100,999)
correo2 = f"Juan.{rand2}@usm.cl"
uname2 = f"Juan{rand2}"
pword2 = f"pword{rand2}"
print(f"Registrando a {uname2} con correo {uname2} y password {pword2}")
res = requests.post(f"{HOST}/api/register",json={
    "identifier":correo2,
    "display_name":uname2,
    "password":pword2,
})
print(res.json())
totp_secret2 = res.json()["totp_secret"]
print(f"TOTP Secret: {totp_secret2}")

Registrando a Juan218 con correo Juan218 y password pword218
{'display_name': 'Juan218', 'id': 42, 'identifier': 'Juan.218@usm.cl', 'totp_secret': 'CM5XPJUXDMYY3I4OUNI4ZSZUVBHO2LGU'}
TOTP Secret: CM5XPJUXDMYY3I4OUNI4ZSZUVBHO2LGU


In [181]:
#Iniciar sesion para JuanXXX
res = requests.post(f"{HOST}/api/login",json={
    "identifier":correo2,
    "password":pword2,
    "totp_code":crypto.generate_totp(totp_secret2),
})
print(res.json())
token2 = res.json()["token"]

{'token': 'VEVMMjUyLUhNQUM=.eyJleHAiOjE3NjQxNzI3NzEsImlhdCI6MTc2NDE2OTE3MSwidWlkIjo0Mn0=.qjqW4E-oigw5Drl_l3ScJVdSIa5WZToOW5HPffiy240=', 'user': {'display_name': 'Juan218', 'id': 42, 'identifier': 'Juan.218@usm.cl'}}


In [182]:
#Generar par de llaves asimetricas
private_key_pem2, public_key_pem2 = crypto.generate_rsa_keypair()

#Registrar dispositivo
res = api_post("/api/devices",payload={
    "device_name":f"{uname2}-laptop",
    "public_key_pem":public_key_pem2,
    },token=token2)
res


{'device_id': 21}

### Mostrar usuarios registrados

In [183]:
res = api_get("/api/users",token=token)
res

{'contacts': [{'display_name': 'Juan218',
   'id': 42,
   'identifier': 'Juan.218@usm.cl',
   'public_key_pem': '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxhvLiyC+LfzSxe82dVWU\nFQSLNqH7paZ4YHUr5/FGOBRMpF0R//UHbEtmFEUqB+ojNzriUPQ9zXYGNxneBWQ+\nddJPdDFemHKalcOatCnX/++hqzsF1VJaKxRfbpYEb8u7Nr3tEhMZCR0U8hL2cQbO\nktCkoKtL40rn6bDT/qIOkCVRmzMHO7Hzbe3fsTDq0OBC2zYC1YT/3gyOnGxTPAnb\nJTDjoZK8PnMQfV+0hyc7PnCTS+tvebLGBQt7R7pE3wGlbcuoFa+Bs1rp2r2Cwgey\ncu7K+RgxdDqOECrk4+dMisVu4TuIX6yMKSoTJUG7cJlvCci6BYUVFH5bUCNN7Fdm\nSQIDAQAB\n-----END PUBLIC KEY-----'},
  {'display_name': 'Juan428',
   'id': 40,
   'identifier': 'Juan.428@usm.cl',
   'public_key_pem': '-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3wsrsBGcGwycLaQtt5Ch\nHlLygXnRY3K/RJGWFCK7C5vUkRCCiBJX/LfPgKOvlFczj3T8Hxw1FKV8MKm1q1DN\nkMQdm+T5WrJnyE+J+xgOJW/XPhu9NJlkkN9KKVKocQuuaK8ylErVqBDrPHqYMG+f\nBtbOM4ot2fR63Wmc9zZq44LUsd4PyrEBkDrXGF2TZtkkxmtB1+NB+/JIUY+/2SYs\nORSfT85aOuQnMYaJNucow02jdTwnofuk

### Enviar nuevo mensaje

In [185]:
#PabloXXX quiere enviar mensaje a JuanXXX
mensaje = f"Buenas tardes don Juan, envio este mensaje a las {datetime.now().time()}"
print(f"Enviar a Juan: {mensaje}")

#Consultar datos de Juan al servidor
datos_juan = api_get(f"/api/users/{correo2}",token=token)

#Recuperar su llave pública
juan_pkey = datos_juan["public_key_pem"]

#Encriptar y generar MAC con AES-256-GCM
session_key_b64, nonce_b64, ciphertext_b64, tag_b64 = crypto.encrypt_payload(
    mensaje.encode("utf-8"),
    associated_data=f"sender={correo}".encode("utf-8"),
)
print(f"Mensaje encriptado con AES: {ciphertext_b64}")

#Encapsular llave simétrica AES con llave privada de Pablo usando RSA-OEP
session_key_bytes = base64.urlsafe_b64decode(session_key_b64.encode("utf-8"))
print(f"LLave de sesion (simetrica privada entre usuarios): {session_key_b64}")
encrypted_session_key = crypto.encrypt_session_key_with_rsa(juan_pkey, session_key_bytes)
print(f"Llave encapsulada: {encrypted_session_key}")

#Enviar
res = api_post("/api/messages", payload = {
    "recipient_identifier": correo2,
    "session_key_encrypted": encrypted_session_key,
    "nonce_b64": nonce_b64,
    "ciphertext_b64": ciphertext_b64,
    "tag_b64": tag_b64,
    "associated_data_b64": base64.urlsafe_b64encode(
        f"sender={correo}".encode("utf-8")
    ).decode("utf-8"),
}, token=token)

#res = {message_id:<numero>} significa que se registro correctamente
print(res)

Enviar a Juan: Buenas tardes don Juan, envio este mensaje a las 12:00:20.380213
Mensaje encriptado con AES: -uGy9KUK72lr7w11oqaFtOEpf39XRaEK5f8TNmWEsQ8qSMywuTggdB0Rkzx_nzSq-IQga1sixi2nwmKfRWxFlg==
LLave de sesion (simetrica privada entre usuarios): yhPjZVzyXTFJtOZA84JNCNI7trq9_IH8TcfnphsgrOg=
Llave encapsulada: xKAA-NtNZyoo0pmSColhGQG_uxR9E3Bs-h8ab76mePSmUqcz9HO_kwuuN_LlZZzyXPmFaAFbWirFH8P4CqqRfriNgFOPPnFPHq-rbW1KSX_yqPM8-ENy9cjaPte41DNGz-4q_RKQC7d3YZXkK38-NH63qv3pFRcRnQt5CXrOosoqETfsVNuNKqg-x8pZgfP4HR9GqR4gUSInyRst-EdSjFG6iTd1qUxHwlLSBaWhg_fsXpgklShgEuUbM3phtLMpN6LvoCq1gS396J9PEZQmshmvxOM8UAYr9-WNJC_59v37phP2bRzJboj542A4a9To16nDPwU7B-0iZ8qse8t35w==
{'message_id': 32}


### Leer y desencriptar mensaje recibido

In [186]:
#Revisar mensajes de JuanXXX
mensajes = api_get("/api/messages",token=token2)
#Mensajes se guardan encriptados en el servidor
mensajes

{'messages': [{'associated_data_b64': 'c2VuZGVyPVBhYmxvLjU3N0B1c20uY2w=',
   'ciphertext_b64': '-uGy9KUK72lr7w11oqaFtOEpf39XRaEK5f8TNmWEsQ8qSMywuTggdB0Rkzx_nzSq-IQga1sixi2nwmKfRWxFlg==',
   'created_at': '2025-11-26 15:00:20',
   'id': 32,
   'nonce_b64': 'fNe0Bu-6GCPVDPzB',
   'sender_display_name': 'Pablo577',
   'sender_identifier': 'Pablo.577@usm.cl',
   'session_key_encrypted': 'xKAA-NtNZyoo0pmSColhGQG_uxR9E3Bs-h8ab76mePSmUqcz9HO_kwuuN_LlZZzyXPmFaAFbWirFH8P4CqqRfriNgFOPPnFPHq-rbW1KSX_yqPM8-ENy9cjaPte41DNGz-4q_RKQC7d3YZXkK38-NH63qv3pFRcRnQt5CXrOosoqETfsVNuNKqg-x8pZgfP4HR9GqR4gUSInyRst-EdSjFG6iTd1qUxHwlLSBaWhg_fsXpgklShgEuUbM3phtLMpN6LvoCq1gS396J9PEZQmshmvxOM8UAYr9-WNJC_59v37phP2bRzJboj542A4a9To16nDPwU7B-0iZ8qse8t35w==',
   'tag_b64': '5yhQ4RVLG3KsK_LD1F9PuQ=='}]}

In [187]:
#Por si hay más de un mensaje
for val in mensajes["messages"]:
    #Desencapsular llave simétrica usando llave privada RSA
    session_key_bytes2 = crypto.decrypt_session_key_with_rsa(
        private_key_pem=private_key_pem2,
        encrypted_b64=val["session_key_encrypted"]
    )
    session_key_b642 = base64.urlsafe_b64encode(session_key_bytes2).decode("utf-8")
    print(f"LLave de sesion (simetrica privada entre usuarios): {session_key_b642}")
    
    #Recuperar associated data
    associated_bytes = base64.urlsafe_b64decode(val["associated_data_b64"].encode("utf-8")) 
    
    #Desencriptar con AES-GCM y verificar integridad del mensaje
    print(f"Mensaje encriptado: {val["ciphertext_b64"]}")
    mensaje_recuperado = crypto.decrypt_payload(
        session_key_b64=session_key_b642,
        nonce_b64=val["nonce_b64"],
        ciphertext_b64=val["ciphertext_b64"],
        tag_b64=val["tag_b64"],
        associated_data=associated_bytes,
    )
    print(f"Mensaje desencriptado: {mensaje_recuperado}")


LLave de sesion (simetrica privada entre usuarios): yhPjZVzyXTFJtOZA84JNCNI7trq9_IH8TcfnphsgrOg=
Mensaje encriptado: -uGy9KUK72lr7w11oqaFtOEpf39XRaEK5f8TNmWEsQ8qSMywuTggdB0Rkzx_nzSq-IQga1sixi2nwmKfRWxFlg==
Mensaje desencriptado: b'Buenas tardes don Juan, envio este mensaje a las 12:00:20.380213'


## Código del servidor

### server.py

In [None]:
"""Flask API exposing the TEL252 end-to-end encrypted chat service."""

from __future__ import annotations

import functools
import sqlite3
from pathlib import Path
from typing import Callable, Optional

from flask import Flask, jsonify, request, send_from_directory
import urllib.parse

from . import crypto
from .config import load_password_secret, load_session_secret
from .database import Database


def create_app(database: Optional[Database] = None) -> Flask:
    app = Flask(__name__)

    db = database or Database()
    session_secret = load_session_secret()
    password_secret = load_password_secret()
    web_client_dir = Path(__file__).resolve().parent / "web_client"

    def _auth_header_token() -> Optional[str]:
        header = request.headers.get("Authorization", "")
        if header.lower().startswith("bearer "):
            return header.split(" ", 1)[1].strip()
        return None

    def _require_auth() -> tuple:
        token = _auth_header_token()
        if not token:
            return None, ("Missing bearer token", 401)

        session_data = crypto.verify_session_token(token, session_secret)
        if not session_data:
            return None, ("Invalid or expired session", 401)

        user = db.get_user_by_id(session_data["uid"])
        if not user:
            return None, ("Unknown session user", 401)

        return (token, session_data, user), None

    def login_required(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            auth_context, error = _require_auth()
            if error:
                message, status = error
                return jsonify({"error": message}), status
            token, session_data, user = auth_context
            return func(*args, auth_token=token, session=session_data, user=user, **kwargs)

        return wrapper

    # ---------------------------------------------------------------
    # Health check
    # ---------------------------------------------------------------

    @app.get("/healthz")
    def healthcheck():
        return jsonify({"status": "ok"})

    @app.get("/")
    def root():
        if web_client_dir.exists():
            return jsonify({"message": "TEL252 chat API", "ui": "/ui/"})
        return jsonify({"message": "TEL252 chat API"})

    @app.get("/ui/")
    def ui_entrypoint():
        if not web_client_dir.exists():
            return jsonify({"error": "UI assets not found"}), 404
        return send_from_directory(web_client_dir, "index.html")

    @app.get("/ui/<path:filename>")
    def ui_assets(filename: str):
        if not web_client_dir.exists():
            return jsonify({"error": "UI assets not found"}), 404
        return send_from_directory(web_client_dir, filename)

    # ---------------------------------------------------------------
    # Authentication
    # ---------------------------------------------------------------

    @app.post("/api/register")
    def register():
        payload = request.get_json() or {}

        identifier = (payload.get("identifier") or "").strip()
        display_name = (payload.get("display_name") or "").strip()
        password = payload.get("password") or ""

        if not identifier or not display_name or not password:
            return (
                jsonify({"error": "identifier, display_name and password are required"}),
                400,
            )

        if db.get_user_by_identifier(identifier) is not None:
            return jsonify({"error": "identifier already registered"}), 409

        password_hash = crypto.hash_password(password, password_secret)
        totp_secret = crypto.generate_totp_secret()
        user_id = db.create_user(identifier, display_name, password_hash, totp_secret)

        return (
            jsonify(
                {
                    "id": user_id,
                    "identifier": identifier,
                    "display_name": display_name,
                    "totp_secret": totp_secret,
                }
            ),
            201,
        )

    @app.post("/api/login")
    def login():
        payload = request.get_json() or {}

        identifier = (payload.get("identifier") or "").strip()
        password = payload.get("password") or ""
        totp_code = (payload.get("totp_code") or "").strip()

        if not identifier or not password or not totp_code:
            return jsonify({"error": "identifier, password and totp_code are required"}), 400

        user = db.get_user_by_identifier(identifier)
        if not user:
            return jsonify({"error": "invalid credentials"}), 401

        if not crypto.verify_password(password, user["password_hash"], password_secret):
            return jsonify({"error": "invalid credentials"}), 401

        if not crypto.verify_totp(user["totp_secret"], totp_code):
            return jsonify({"error": "invalid code"}), 401

        token = crypto.create_session_token(user["id"], session_secret)
        return jsonify(
            {
                "token": token,
                "user": {
                    "id": user["id"],
                    "identifier": user["identifier"],
                    "display_name": user["display_name"],
                },
            }
        )

    @app.get("/api/totp/setup")
    @login_required
    def totp_setup(*, user, **_):
        """Return the TOTP secret and an otpauth URL for QR code generation."""
        secret = user["totp_secret"]
        identifier = user["identifier"]
        issuer = "TEL252"
        
        # otpauth://totp/Issuer:Account?secret=...&issuer=...
        label = f"{issuer}:{identifier}"
        params = {
            "secret": secret,
            "issuer": issuer,
            "algorithm": "SHA1",
            "digits": 6,
            "period": 30,
        }
        otpauth_url = f"otpauth://totp/{urllib.parse.quote(label)}?{urllib.parse.urlencode(params)}"
        
        return jsonify({
            "secret": secret,
            "otpauth_url": otpauth_url
        })

    # ---------------------------------------------------------------
    # Device / key management
    # ---------------------------------------------------------------

    @app.post("/api/devices")
    @login_required
    def register_device(*, user, **_):
        payload = request.get_json() or {}
        device_name = (payload.get("device_name") or "").strip()
        public_key_pem = payload.get("public_key_pem") or ""

        if not device_name or not public_key_pem:
            return jsonify({"error": "device_name and public_key_pem are required"}), 400

        if "BEGIN PUBLIC KEY" not in public_key_pem:
            return jsonify({"error": "public_key_pem must be a PEM encoded key"}), 400

        try:
            device_id = db.register_device(user["id"], device_name, public_key_pem)
        except sqlite3.IntegrityError:
            return jsonify({"error": "device name already registered"}), 409

        return jsonify({"device_id": device_id}), 201

    @app.get("/api/devices")
    @login_required
    def list_devices(*, user, **_):
        records = db.list_devices(user["id"])
        return jsonify(
            {
                "devices": [
                    {
                        "id": row["id"],
                        "device_name": row["device_name"],
                        "public_key_pem": row["public_key_pem"],
                        "created_at": row["created_at"],
                    }
                    for row in records
                ]
            }
        )

    # ---------------------------------------------------------------
    # Directory
    # ---------------------------------------------------------------

    @app.get("/api/users")
    @login_required
    def directory(*, user, **_):
        contacts = []
        for row in db.list_users(exclude_user_id=user["id"]):
            public_key = db.get_primary_public_key(row["id"])
            contacts.append(
                {
                    "id": row["id"],
                    "identifier": row["identifier"],
                    "display_name": row["display_name"],
                    "public_key_pem": public_key,
                }
            )
        return jsonify({"contacts": contacts})

    @app.get("/api/users/<identifier>")
    @login_required
    def user_details(identifier: str, **_):
        row = db.get_user_by_identifier(identifier)
        if not row:
            return jsonify({"error": "user not found"}), 404

        public_key = db.get_primary_public_key(row["id"])
        return jsonify(
            {
                "id": row["id"],
                "identifier": row["identifier"],
                "display_name": row["display_name"],
                "public_key_pem": public_key,
            }
        )

    # ---------------------------------------------------------------
    # Messaging
    # ---------------------------------------------------------------

    def _build_encrypted_message(sender_row, payload: dict) -> Optional[crypto.EncryptedMessage]:
        required_fields = (
            "recipient_identifier",
            "session_key_encrypted",
            "nonce_b64",
            "ciphertext_b64",
            "tag_b64",
        )
        for field in required_fields:
            if not payload.get(field):
                return None

        recipient_row = db.get_user_by_identifier(payload["recipient_identifier"])
        if not recipient_row:
            return None

        return crypto.EncryptedMessage(
            sender_id=sender_row["id"],
            recipient_id=recipient_row["id"],
            session_key_encrypted=payload["session_key_encrypted"],
            nonce_b64=payload["nonce_b64"],
            ciphertext_b64=payload["ciphertext_b64"],
            tag_b64=payload["tag_b64"],
            associated_data_b64=payload.get("associated_data_b64", ""),
        )

    @app.post("/api/messages")
    @login_required
    def send_message(*, user, **_):
        payload = request.get_json() or {}
        message = _build_encrypted_message(user, payload)
        if not message:
            return jsonify({"error": "invalid message payload"}), 400

        message_id = db.store_message(message)
        return jsonify({"message_id": message_id}), 201

    @app.get("/api/messages")
    @login_required
    def inbox(*, user, **_):
        records = db.fetch_messages_for_user(user["id"])
        return jsonify(
            {
                "messages": [
                    {
                        "id": row["id"],
                        "sender_identifier": row["sender_identifier"],
                        "sender_display_name": row["sender_display_name"],
                        "session_key_encrypted": row["session_key_encrypted"],
                        "nonce_b64": row["nonce_b64"],
                        "ciphertext_b64": row["ciphertext_b64"],
                        "tag_b64": row["tag_b64"],
                        "associated_data_b64": row["associated_data_b64"],
                        "created_at": row["created_at"],
                    }
                    for row in records
                ]
            }
        )

    @app.delete("/api/messages/<int:message_id>")
    @login_required
    def delete_message(message_id: int, *, user, **_):
        db.delete_message(message_id, user["id"])
        return ("", 204)

    return app


if __name__ == "__main__":
    api = create_app()
    api.run(host="0.0.0.0", port=5000, debug=True)


### crypto.py

In [None]:
"""Cryptographic primitives for the TEL252 end-to-end encrypted chat demo.

This module centralises every cryptographic building block used by the project.
All functions are deliberately thin wrappers with extensive documentation so the
cryptographic reasoning behind each call is explicit for educational purposes.
"""

from __future__ import annotations

import base64
import binascii
import hashlib
import hmac
import json
import struct
import time
from dataclasses import dataclass
from typing import Optional, Tuple

from Crypto.Cipher import AES
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Hash import SHA256

# ---------------------------------------------------------------------------
# Password authentication (HMAC-SHA256)
# ---------------------------------------------------------------------------

PASSWORD_MAC_ALGO = hashlib.sha256


def hash_password(password: str, secret: bytes) -> str:
    """Produce an HMAC-SHA256 digest for ``password`` using ``secret``.

    This mirrors the MAC construction revisited in Symmetric Crypto III
    (Clase 11). Instead of PBKDF2, we rely on a high-entropy secret key that
    lives solely on the server. Students can inspect how HMAC authenticates the
    password material with a single hashing pass.
    """

    mac = hmac.new(secret, password.encode("utf-8"), PASSWORD_MAC_ALGO).digest()
    return base64.urlsafe_b64encode(mac).decode("utf-8")


def verify_password(password: str, digest_b64: str, secret: bytes) -> bool:
    """Check ``password`` against the stored HMAC digest."""

    expected = base64.urlsafe_b64decode(digest_b64)
    candidate = hmac.new(secret, password.encode("utf-8"), PASSWORD_MAC_ALGO).digest()
    return hmac.compare_digest(candidate, expected)


# ---------------------------------------------------------------------------
# Time-based One-Time Password (TOTP) according to RFC 6238
# ---------------------------------------------------------------------------

TOTP_DIGITS = 6
TOTP_TIME_STEP = 30  # seconds


def generate_totp_secret() -> str:
    """Create a new 160-bit secret encoded in base32 for authenticator apps."""

    return base64.b32encode(get_random_bytes(20)).decode("utf-8").strip("=")


def _totp_counter(timestamp: Optional[int] = None, step: int = TOTP_TIME_STEP) -> int:
    """Compute the moving factor (time counter) used in TOTP."""

    if timestamp is None:
        timestamp = int(time.time())
    return timestamp // step


def generate_totp(secret: str, timestamp: Optional[int] = None) -> str:
    """Generate an RFC 6238 compliant TOTP code for the provided secret."""

    key = base64.b32decode(secret + "=" * ((8 - len(secret) % 8) % 8))
    counter = _totp_counter(timestamp)
    msg = struct.pack(">Q", counter)
    h = hmac.new(key, msg, hashlib.sha1).digest()
    offset = h[-1] & 0x0F
    code_int = (
        ((h[offset] & 0x7F) << 24)
        | ((h[offset + 1] & 0xFF) << 16)
        | ((h[offset + 2] & 0xFF) << 8)
        | (h[offset + 3] & 0xFF)
    )
    return str(code_int % (10 ** TOTP_DIGITS)).zfill(TOTP_DIGITS)


def verify_totp(secret: str, code: str, window: int = 1) -> bool:
    """Validate a TOTP code, allowing a time drift defined by ``window``.

    The server accepts codes from the current time-step plus/minus ``window``
    steps to mitigate clock drift between client and server devices.
    """

    now = int(time.time())
    for offset in range(-window, window + 1):
        timestamp = now + offset * TOTP_TIME_STEP
        if hmac.compare_digest(generate_totp(secret, timestamp), code):
            return True
    return False


# ---------------------------------------------------------------------------
# RSA helpers
# ---------------------------------------------------------------------------

RSA_KEY_SIZE = 2048


def generate_rsa_keypair(bits: int = RSA_KEY_SIZE) -> Tuple[str, str]:
    """Generate an RSA keypair (PEM encoded) using PyCryptodome.

    Returns the private and public key PEM strings. These keys are used by the
    clients to exchange ephemeral AES session keys securely (via RSA-OAEP).
    """

    key = RSA.generate(bits)
    private_pem = key.export_key(format="PEM").decode("utf-8")
    public_pem = key.publickey().export_key(format="PEM").decode("utf-8")
    return private_pem, public_pem


def encrypt_session_key_with_rsa(public_key_pem: str, session_key: bytes) -> str:
    """Encrypt a random AES session key using RSA-OAEP.

    The ciphertext is returned base64-encoded so that it can be safely
    transported over JSON without binary issues.
    """

    public_key = RSA.import_key(public_key_pem)
    cipher_rsa = PKCS1_OAEP.new(public_key, hashAlgo=SHA256)
    encrypted = cipher_rsa.encrypt(session_key)
    return base64.urlsafe_b64encode(encrypted).decode("utf-8")


def decrypt_session_key_with_rsa(private_key_pem: str, encrypted_b64: str) -> bytes:
    """Inverse operation of :func:`encrypt_session_key_with_rsa`."""

    private_key = RSA.import_key(private_key_pem)
    cipher_rsa = PKCS1_OAEP.new(private_key, hashAlgo=SHA256)
    encrypted = base64.urlsafe_b64decode(encrypted_b64)
    return cipher_rsa.decrypt(encrypted)


# ---------------------------------------------------------------------------
# Symmetric encryption (AES-256-GCM)
# ---------------------------------------------------------------------------

AES_KEY_SIZE = 32  # 256-bit symmetric key
GCM_NONCE_SIZE = 12  # 96-bit nonce recommended by NIST


def encrypt_payload(plaintext: bytes, associated_data: bytes = b"") -> Tuple[str, str, str, str]:
    """Encrypt ``plaintext`` with AES-256-GCM and return base64 artefacts.

    The function internally generates a fresh random session key which must be
    distributed using RSA. Returning both the session key and ciphertext keeps
    the server agnostic about the actual message contents.
    """

    session_key = get_random_bytes(AES_KEY_SIZE)
    nonce = get_random_bytes(GCM_NONCE_SIZE)

    cipher = AES.new(session_key, AES.MODE_GCM, nonce=nonce)
    cipher.update(associated_data)
    ciphertext, tag = cipher.encrypt_and_digest(plaintext)

    return (
        base64.urlsafe_b64encode(session_key).decode("utf-8"),
        base64.urlsafe_b64encode(nonce).decode("utf-8"),
        base64.urlsafe_b64encode(ciphertext).decode("utf-8"),
        base64.urlsafe_b64encode(tag).decode("utf-8"),
    )


def decrypt_payload(
    session_key_b64: str,
    nonce_b64: str,
    ciphertext_b64: str,
    tag_b64: str,
    associated_data: bytes = b"",
) -> bytes:
    """Decrypt AES-GCM artefacts created by :func:`encrypt_payload`."""

    session_key = base64.urlsafe_b64decode(session_key_b64)
    nonce = base64.urlsafe_b64decode(nonce_b64)
    ciphertext = base64.urlsafe_b64decode(ciphertext_b64)
    tag = base64.urlsafe_b64decode(tag_b64)

    cipher = AES.new(session_key, AES.MODE_GCM, nonce=nonce)
    cipher.update(associated_data)
    return cipher.decrypt_and_verify(ciphertext, tag)


# ---------------------------------------------------------------------------
# Session tokens (HMAC-SHA256 protected JSON)
# ---------------------------------------------------------------------------

SESSION_TOKEN_TTL = 60 * 60  # 1 hour


def _sign(session_data: dict, secret: bytes) -> str:
    payload = json.dumps(session_data, separators=(",", ":"), sort_keys=True).encode("utf-8")
    mac = hmac.new(secret, payload, hashlib.sha256).digest()
    return base64.urlsafe_b64encode(mac).decode("utf-8"), payload


def create_session_token(user_id: int, secret: bytes, ttl: int = SESSION_TOKEN_TTL) -> str:
    """Create a tamper-evident session token signed with HMAC-SHA256.

    The token structure is ``header.payload.signature`` all base64 encoded.
    This is intentionally similar to JWTs but implemented manually for
    educational transparency.
    """

    issued_at = int(time.time())
    session_data = {"uid": user_id, "iat": issued_at, "exp": issued_at + ttl}
    signature, payload = _sign(session_data, secret)
    header = base64.urlsafe_b64encode(b"TEL252-HMAC").decode("utf-8")
    return ".".join([
        header,
        base64.urlsafe_b64encode(payload).decode("utf-8"),
        signature,
    ])


def verify_session_token(token: str, secret: bytes) -> Optional[dict]:
    """Verify integrity and expiration of a session token."""

    try:
        header_b64, payload_b64, signature = token.split(".")
        if header_b64 != base64.urlsafe_b64encode(b"TEL252-HMAC").decode("utf-8"):
            return None
        payload = base64.urlsafe_b64decode(payload_b64)
        expected_sig = hmac.new(secret, payload, hashlib.sha256).digest()
        if not hmac.compare_digest(base64.urlsafe_b64decode(signature), expected_sig):
            return None
        session_data = json.loads(payload.decode("utf-8"))
        if session_data.get("exp", 0) < time.time():
            return None
        return session_data
    except (ValueError, json.JSONDecodeError, binascii.Error):
        return None


# ---------------------------------------------------------------------------
# Utility dataclasses
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class EncryptedMessage:
    """Convenience container representing the artefacts stored for each message."""

    sender_id: int
    recipient_id: int
    session_key_encrypted: str
    nonce_b64: str
    ciphertext_b64: str
    tag_b64: str
    associated_data_b64: str = ""


@dataclass(frozen=True)
class PublicIdentity:
    """Expose user metadata alongside their RSA public key."""

    identifier: str
    display_name: str
    public_key_pem: str


# Anexo: Repositorio de github

El código completo del proyecto está disponible en el siguiente repositorio de GitHub:

[https://github.com/crismoraga/e2ee_chat](https://github.com/crismoraga/e2ee_chat)

# Anexo: Diagrama de flujo de la comunicación E2EE

![Diagrama de Arquitectura](./docs/diagrama_arquitectura-3.svg)