Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/adcp/signing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,36 @@
* :class:`SigningConfig` — bundle key material for auto-signing via
``ADCPClient(signing=...)``

**Provisioning** (new keypairs):

* :func:`generate_signing_keypair` — programmatic counterpart to the
``adcp-keygen`` CLI. Returns ``(pem_bytes, public_jwk)`` so tests,
provisioning scripts, and any non-shell context can mint keys
without spawning a subprocess. Both paths share the same spine — a
PEM generated here is indistinguishable from one the CLI wrote.

.. code-block:: python

import os

from adcp.signing import generate_signing_keypair

# CLI equivalence:
# adcp-keygen --alg ed25519 --purpose webhook-signing
pem, public_jwk = generate_signing_keypair(
alg="ed25519", purpose="webhook-signing"
)

# Mode 0600, O_EXCL so an existing file is never overwritten.
# Path.write_bytes inherits the process umask (often 0644 =
# world-readable) — don't use it for private-key material.
fd = os.open("webhook-key.pem", os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
try:
os.write(fd, pem)
finally:
os.close(fd)
publish_to_jwks_uri(public_jwk)

**Sellers** (verifying incoming requests):

* :func:`verify_starlette_request` / :func:`verify_flask_request` —
Expand Down Expand Up @@ -145,6 +175,7 @@
verify_detached_jws,
verify_jws_document,
)
from adcp.signing.keygen import generate_signing_keypair
from adcp.signing.middleware import (
unauthorized_response_headers,
verify_flask_request,
Expand Down Expand Up @@ -286,6 +317,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"default_revocation_list_fetcher",
"extract_signature_bytes",
"format_signature_header",
"generate_signing_keypair",
"load_private_key_pem",
"operation_needs_signing",
"parse_signature_input_header",
Expand Down
134 changes: 122 additions & 12 deletions src/adcp/signing/keygen.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,40 @@
"""Key generation helpers for AdCP request signing.

Writes a PEM private key and prints the matching JWK (public half) with
`adcp_use: "request-signing"` — paste that JWK into your agent's JWKS at the
URL advertised in brand.json.
Two equivalent entry points — pick whichever fits your calling context:

Usage:
- :func:`generate_signing_keypair` — programmatic API returning
``(pem_bytes, public_jwk)``. Use from tests, provisioning scripts, and
any non-shell code where spawning a subprocess is the wrong shape.
- ``adcp-keygen`` CLI (:func:`main`) — wraps the same helper plus
file-writing and stdout printing, so CI / shell pipelines stay
ergonomic.

Both paths produce the same PEM and JWK. Publish the JWK (public half)
at your agent's ``jwks_uri`` — the ``adcp_use`` claim (``request-signing``
or ``webhook-signing``) pins the key to one signing profile; verifiers
reject keys used in the wrong surface.

Usage (CLI):
adcp-keygen --alg ed25519 --out private-key.pem
adcp-keygen --alg es256 --out private-key.pem
adcp-keygen --alg ed25519 --encrypt --out private-key.pem

Usage (programmatic):
from adcp.signing import generate_signing_keypair

pem_bytes, public_jwk = generate_signing_keypair(
alg="ed25519", purpose="webhook-signing"
)

# Write mode-0600 atomically. DON'T use Path.write_bytes — it
# inherits the process umask (typically 0644 = world-readable).
import os
fd = os.open("key.pem", os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
try:
os.write(fd, pem_bytes)
finally:
os.close(fd)
publish_to_jwks_uri(public_jwk)
"""

from __future__ import annotations
Expand All @@ -16,11 +43,12 @@
import getpass
import json
import os
import secrets
import sys
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from typing import Any, Literal

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
Expand Down Expand Up @@ -90,6 +118,88 @@ def generate_es256(
return pem, jwk


def _default_kid(alg: str) -> str:
"""Default ``kid`` — opaque, collision-resistant.

Combines alg + UTC date + 4 random hex chars so two calls in the
same UTC day produce distinct kids. Format is an implementation
detail; downstream tooling MUST NOT parse it. Callers managing
rotation SHOULD pass an explicit ``kid`` they control.

Same-day collisions without the random suffix silently break
verification: two JWKs advertised under the same kid, verifiers
cache the first one and reject signatures made with the second as
``REQUEST_SIGNATURE_INVALID``. The suffix prevents that at no
readability cost.
"""
date = datetime.now(timezone.utc).strftime("%Y%m%d")
return f"adcp-{alg}-{date}-{secrets.token_hex(2)}"


def generate_signing_keypair(
*,
alg: Literal["ed25519", "es256"] = "ed25519",
kid: str | None = None,
purpose: Literal["request-signing", "webhook-signing"] = "request-signing",
passphrase: bytes | None = None,
) -> tuple[bytes, dict[str, Any]]:
"""Generate an AdCP signing keypair.

Programmatic companion to the ``adcp-keygen`` CLI — call this from
tests, provisioning scripts, or any non-shell context where spawning
a subprocess is the wrong shape.

:param alg: Signature algorithm. ``"ed25519"`` (default; tiny keys,
recommended) or ``"es256"`` (ECDSA over P-256, broader ecosystem
support).
:param kid: Key ID to embed in the JWK. When omitted, the SDK mints
an opaque default combining alg + UTC date + 4 random hex chars
— suitable for first-time provisioning only. **Callers managing
rotation MUST supply their own ``kid``.** The default is
collision-resistant within a single process but does not
guarantee uniqueness across processes; rotation tooling needs
its own identifier scheme to track retirement / revocation.
:param purpose: Which AdCP signing profile this key is for. Sets the
JWK ``adcp_use`` claim. **Request-signing and webhook-signing
keys MUST be distinct** — a signature from one surface cannot
replay as the other, and every conformant verifier enforces the
claim. Generate separate keys per purpose.
:param passphrase: When provided, the PEM is encrypted with
``BestAvailableEncryption``. Typical only for dev-laptop keys;
automated deployments usually leave the PEM unencrypted and
rely on filesystem perms (the CLI writes mode 0600).

**Passphrase lifecycle.** CPython cannot zero ``bytes``. Once
passed here, the buffer is consumed by ``cryptography`` and
then released to GC; there's no ``zeroize`` step. Callers
handling long-lived credentials should source the passphrase
from a secret manager per call rather than hold a Python
literal in process memory.

:returns: ``(pem_bytes, public_jwk)``. The PEM is PKCS#8
(optionally encrypted); the JWK is the public half, ready to
publish at your ``jwks_uri``. The private scalar is NOT in the
JWK — only in the PEM.

:raises ValueError: ``alg`` or ``purpose`` is unsupported.

Example:
>>> pem, jwk = generate_signing_keypair(
... alg="ed25519", purpose="webhook-signing"
... )
>>> jwk["adcp_use"]
'webhook-signing'
"""
if alg not in ("ed25519", "es256"):
raise ValueError(f"alg must be 'ed25519' or 'es256', got {alg!r}")
if purpose not in _ADCP_USE_VALUES:
raise ValueError(f"purpose must be one of {_ADCP_USE_VALUES}, got {purpose!r}")
resolved_kid = kid or _default_kid(alg)
if alg == "ed25519":
return generate_ed25519(resolved_kid, passphrase=passphrase, adcp_use=purpose)
return generate_es256(resolved_kid, passphrase=passphrase, adcp_use=purpose)


def _prompt_passphrase_bytes() -> bytes:
first = getpass.getpass("Passphrase: ")
if not first:
Expand Down Expand Up @@ -164,13 +274,13 @@ def main(argv: list[str] | None = None) -> int:

passphrase = _prompt_passphrase_bytes() if args.encrypt else None

kid = args.kid or f"adcp-{args.alg}-{datetime.now(timezone.utc).strftime('%Y%m%d')}"
if args.alg == "ed25519":
pem, jwk = generate_ed25519(kid, passphrase=passphrase, adcp_use=args.purpose)
alg_rfc = ALG_ED25519
else:
pem, jwk = generate_es256(kid, passphrase=passphrase, adcp_use=args.purpose)
alg_rfc = ALG_ES256
pem, jwk = generate_signing_keypair(
alg=args.alg,
kid=args.kid,
purpose=args.purpose,
passphrase=passphrase,
)
alg_rfc = ALG_ED25519 if args.alg == "ed25519" else ALG_ES256

# `--force` clobbers in two steps (non-atomic on overwrite), but the
# happy-path create is atomic via O_EXCL | mode=0o600 so there is no window
Expand Down
Loading
Loading