Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/adcp/signing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,17 @@
verify_detached_jws,
verify_jws_document,
)
from adcp.signing.keygen import generate_signing_keypair
from adcp.signing.keygen import generate_signing_keypair, pem_to_adcp_jwk
from adcp.signing.middleware import (
unauthorized_response_headers,
verify_flask_request,
verify_starlette_request,
)
from adcp.signing.provider import (
InMemorySigningProvider,
SigningAlgorithm,
SigningProvider,
)
from adcp.signing.replay import InMemoryReplayStore, ReplayStore
from adcp.signing.revocation import RevocationChecker, RevocationList
from adcp.signing.revocation_fetcher import (
Expand All @@ -205,6 +210,7 @@
)
from adcp.signing.signer import (
SignedHeaders,
async_sign_request,
sign_request,
)
from adcp.signing.verifier import (
Expand Down Expand Up @@ -259,6 +265,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"DEFAULT_TAG",
"FetchResult",
"InMemoryReplayStore",
"InMemorySigningProvider",
"IpPinnedTransport",
"JwksResolver",
"JwsError",
Expand Down Expand Up @@ -299,8 +306,10 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"SignatureInputLabel",
"SignatureVerificationError",
"SignedHeaders",
"SigningAlgorithm",
"SigningConfig",
"SigningDecision",
"SigningProvider",
"StaticJwksResolver",
"VerifiedSigner",
"VerifierCapability",
Expand All @@ -310,6 +319,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"as_async_resolver",
"async_default_jwks_fetcher",
"async_default_revocation_list_fetcher",
"async_sign_request",
"averify_detached_jws",
"averify_jws_document",
"b64url_decode",
Expand All @@ -330,6 +340,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"load_private_key_pem",
"operation_needs_signing",
"parse_signature_input_header",
"pem_to_adcp_jwk",
"private_key_from_jwk",
"public_key_from_jwk",
"resolve_and_validate_host",
Expand Down
144 changes: 119 additions & 25 deletions src/adcp/signing/keygen.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, ed25519

from adcp.signing.crypto import ALG_ED25519, ALG_ES256, b64url_encode
from adcp.signing.crypto import ALG_ED25519, ALG_ES256, b64url_encode, load_private_key_pem


def _encryption_algorithm(
Expand All @@ -67,21 +67,14 @@ def _encryption_algorithm(
_ADCP_USE_VALUES = ("request-signing", "webhook-signing")


def generate_ed25519(
kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing"
) -> tuple[bytes, dict[str, Any]]:
private = ed25519.Ed25519PrivateKey.generate()
pem = private.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_encryption_algorithm(passphrase),
)
public = private.public_key()
x = public.public_bytes(
def _public_jwk_ed25519(
public_key: ed25519.Ed25519PublicKey, *, kid: str, adcp_use: str
) -> dict[str, Any]:
x = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
jwk = {
return {
"kty": "OKP",
"crv": "Ed25519",
"alg": "EdDSA",
Expand All @@ -91,20 +84,13 @@ def generate_ed25519(
"kid": kid,
"x": b64url_encode(x),
}
return pem, jwk


def generate_es256(
kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing"
) -> tuple[bytes, dict[str, Any]]:
private = ec.generate_private_key(ec.SECP256R1())
pem = private.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_encryption_algorithm(passphrase),
)
numbers = private.public_key().public_numbers()
jwk = {
def _public_jwk_es256(
public_key: ec.EllipticCurvePublicKey, *, kid: str, adcp_use: str
) -> dict[str, Any]:
numbers = public_key.public_numbers()
return {
"kty": "EC",
"crv": "P-256",
"alg": "ES256",
Expand All @@ -115,9 +101,117 @@ def generate_es256(
"x": b64url_encode(numbers.x.to_bytes(32, "big")),
"y": b64url_encode(numbers.y.to_bytes(32, "big")),
}


def generate_ed25519(
kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing"
) -> tuple[bytes, dict[str, Any]]:
private = ed25519.Ed25519PrivateKey.generate()
pem = private.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_encryption_algorithm(passphrase),
)
jwk = _public_jwk_ed25519(private.public_key(), kid=kid, adcp_use=adcp_use)
return pem, jwk


def generate_es256(
kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing"
) -> tuple[bytes, dict[str, Any]]:
private = ec.generate_private_key(ec.SECP256R1())
pem = private.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_encryption_algorithm(passphrase),
)
jwk = _public_jwk_es256(private.public_key(), kid=kid, adcp_use=adcp_use)
return pem, jwk


def pem_to_adcp_jwk(
pem: bytes,
*,
kid: str,
purpose: Literal["request-signing", "webhook-signing"],
password: bytes | None = None,
) -> dict[str, Any]:
"""Derive the public JWK for an existing AdCP signing PEM.

Companion to :func:`generate_signing_keypair` for the case where
the key was minted elsewhere — typically in a managed key store
(KMS / HSM / Vault) that exports the public half as a PEM. The
output JWK is byte-shape-identical to what
:func:`generate_signing_keypair` would have produced for the same
key material, so it is safe to publish at the agent's
``jwks_uri`` directly.

Why a helper at all? Three fields in the JWK are easy to mis-emit
by hand and every wrong value yields a verifier rejection at the
first signed request:

* ``alg`` — MUST be ``"EdDSA"`` for Ed25519, ``"ES256"`` for P-256
(NOT the RFC 9421 ``alg`` casing used in ``Signature-Input``).
* ``adcp_use`` — required by AdCP #2423; verifiers reject keys
lacking it. MUST match the signing surface (``"request-signing"``
vs. ``"webhook-signing"``).
* ``key_ops`` — MUST be ``["verify"]`` (the public half cannot
sign).

:param pem: PEM-encoded private key (PKCS#8). Pass the PEM only
when the private half is at hand — for KMS deployments where
the private material never leaves the managed store, pass an
SPKI public-key PEM (``-----BEGIN PUBLIC KEY-----``) instead;
the loader handles both forms.
:param kid: JWK ``kid`` to embed. MUST match the value the signer
will advertise via :meth:`SigningProvider.key_id`.
:param purpose: Which AdCP signing profile this key is for. Sets
``adcp_use``. Generate distinct keys per purpose — sharing
material across request-signing and webhook-signing is a spec
violation, not just a convention.
:param password: Passphrase if ``pem`` is an encrypted private
key.

:returns: Public JWK ready to publish in the agent's ``jwks_uri``.
The private scalar (``d``) is NEVER included in the output.

:raises ValueError: ``purpose`` is not in
``("request-signing", "webhook-signing")``; the PEM is not
Ed25519 or ECDSA-P-256; the EC curve is not P-256.
"""
if purpose not in _ADCP_USE_VALUES:
raise ValueError(f"purpose must be one of {_ADCP_USE_VALUES}, got {purpose!r}")
if not kid:
raise ValueError("kid must be a non-empty string")

# SPKI public-key PEMs use the exact header `-----BEGIN PUBLIC KEY-----`;
# private-key PEMs use a different header. Match the full sentinel rather
# than a substring so a future PEM type whose header contains the words
# "PUBLIC" + "KEY" (e.g., a hypothetical encrypted-public-key form)
# doesn't silently dispatch to the wrong loader.
if b"-----BEGIN PUBLIC KEY-----" in pem[:128]:
loaded = serialization.load_pem_public_key(pem)
if not isinstance(loaded, (ed25519.Ed25519PublicKey, ec.EllipticCurvePublicKey)):
raise ValueError(
f"unsupported public key type {type(loaded).__name__} — "
f"AdCP signing accepts Ed25519 or ECDSA-P-256 only"
)
if isinstance(loaded, ec.EllipticCurvePublicKey) and not isinstance(
loaded.curve, ec.SECP256R1
):
raise ValueError(
f"EC public key curve {loaded.curve.name} is not supported — only "
f"P-256 (SECP256R1) is allowed"
)
public_key: ed25519.Ed25519PublicKey | ec.EllipticCurvePublicKey = loaded
else:
public_key = load_private_key_pem(pem, password=password).public_key()

if isinstance(public_key, ed25519.Ed25519PublicKey):
return _public_jwk_ed25519(public_key, kid=kid, adcp_use=purpose)
return _public_jwk_es256(public_key, kid=kid, adcp_use=purpose)


def _default_kid(alg: str) -> str:
"""Default ``kid`` — opaque, collision-resistant.

Expand Down
Loading
Loading