Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 3 additions & 92 deletions didcomm_messaging/crypto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,5 @@
"""Key Management Service (CryptoService) interface for DIDComm Messaging."""
"""DIDComm Messaging Cryptography and Secrets Interfaces."""

from .base import CryptoService, SecretsManager, PublicKey, SecretKey, P, S

from abc import ABC, abstractmethod
from typing import Generic, Optional, Sequence, TypeVar, Union

from pydid import VerificationMethod

from .jwe import JweEnvelope


class CryptoServiceError(Exception):
"""Represents an error from a CryptoService."""


class PublicKey(ABC):
"""Key representation for CryptoService."""

@classmethod
@abstractmethod
def from_verification_method(cls, vm: VerificationMethod) -> "PublicKey":
"""Create a Key instance from a DID Document Verification Method."""

@property
@abstractmethod
def kid(self) -> str:
"""Get the key ID."""

@property
@abstractmethod
def multikey(self) -> str:
"""Get the key in multikey format."""


class SecretKey(ABC):
"""Secret Key Type."""

@property
@abstractmethod
def kid(self) -> str:
"""Get the key ID."""


P = TypeVar("P", bound=PublicKey)
S = TypeVar("S", bound=SecretKey)


class CryptoService(ABC, Generic[P, S]):
"""Key Management Service (CryptoService) interface for DIDComm Messaging."""

@abstractmethod
async def ecdh_es_encrypt(self, to_keys: Sequence[P], message: bytes) -> bytes:
"""Encode a message into DIDComm v2 anonymous encryption."""

@abstractmethod
async def ecdh_es_decrypt(
self, wrapper: Union[JweEnvelope, str, bytes], recip_key: S
) -> bytes:
"""Decode a message from DIDComm v2 anonymous encryption."""

@abstractmethod
async def ecdh_1pu_encrypt(
self,
to_keys: Sequence[P],
sender_key: S,
message: bytes,
) -> bytes:
"""Encode a message into DIDComm v2 authenticated encryption."""

@abstractmethod
async def ecdh_1pu_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
recip_key: S,
sender_key: P,
) -> bytes:
"""Decode a message from DIDComm v2 authenticated encryption."""

@classmethod
@abstractmethod
def verification_method_to_public_key(cls, vm: VerificationMethod) -> P:
"""Convert a verification method to a public key."""


class SecretsManager(ABC, Generic[S]):
"""Secrets Resolver interface.

Thie secrets resolver may be used to supplement the CryptoService backend to provide
greater flexibility.
"""

@abstractmethod
async def get_secret_by_kid(self, kid: str) -> Optional[S]:
"""Get a secret key by its ID."""
__all__ = ["CryptoService", "SecretsManager", "PublicKey", "SecretKey", "P", "S"]
1 change: 1 addition & 0 deletions didcomm_messaging/crypto/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Cryptography and Secrets Management backends."""
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
"""Askar backend for DIDComm Messaging."""
from collections import OrderedDict
import hashlib
import json
from typing import Optional, Sequence, Union
import hashlib

from pydid import VerificationMethod
from didcomm_messaging.crypto import SecretsManager
from ..jwe import (
JweBuilder,
JweEnvelope,
JweRecipient,
b64url,
)
from didcomm_messaging.crypto import (

from didcomm_messaging.crypto.base import (
CryptoService,
CryptoServiceError,
PublicKey,
SecretKey,
SecretsManager,
)
from didcomm_messaging.crypto.jwe import JweBuilder, JweEnvelope, JweRecipient, b64url
from didcomm_messaging.multiformats import multibase, multicodec

try:
Expand Down Expand Up @@ -75,44 +71,6 @@ def multikey_to_key(cls, multikey: str) -> Key:
except AskarError as err:
raise ValueError("Invalid key") from err

@classmethod
def _expected_alg_and_material_to_key(
cls,
alg: KeyAlg,
public_key_multibase: Optional[str] = None,
public_key_base58: Optional[str] = None,
) -> Key:
"""Convert an Ed25519 key to an Askar Key instance."""
if public_key_multibase and public_key_base58:
raise ValueError(
"Only one of public_key_multibase or public_key_base58 must be given"
)
if not public_key_multibase and not public_key_base58:
raise ValueError(
"One of public_key_multibase or public_key_base58 must be given)"
)

if public_key_multibase:
decoded = multibase.decode(public_key_multibase)
if len(decoded) == 32:
# No multicodec prefix
try:
key = Key.from_public_bytes(alg, decoded)
except AskarError as err:
raise ValueError("Invalid key") from err
return key
else:
key = cls.multikey_to_key(public_key_multibase)
if key.algorithm != alg:
raise ValueError("Type and algorithm mismatch")
return key

if public_key_base58:
decoded = multibase.decode("z" + public_key_base58)
return Key.from_public_bytes(alg, decoded)

raise ValueError("Failed to parse key")

@classmethod
def from_verification_method(cls, vm: VerificationMethod) -> "AskarKey":
"""Create a Key instance from a DID Document Verification Method."""
Expand All @@ -133,11 +91,8 @@ def from_verification_method(cls, vm: VerificationMethod) -> "AskarKey":
if not alg:
raise ValueError("Unsupported verification method type: {vm_type}")

base58 = vm.public_key_base58
multi = vm.public_key_multibase
key = cls._expected_alg_and_material_to_key(
alg, public_key_base58=base58, public_key_multibase=multi
)
key_bytes = cls.key_bytes_from_verification_method(vm)
key = Key.from_public_bytes(alg, key_bytes)
return cls(key, kid)

@property
Expand Down Expand Up @@ -187,26 +142,37 @@ async def ecdh_es_encrypt(
except AskarError:
raise CryptoServiceError("Error creating content encryption key")

apv = []
for recip_key in to_keys:
apv.append(recip_key.kid)
apv.sort()
apv = hashlib.sha256((".".join(apv)).encode()).digest()

for recip_key in to_keys:
try:
epk = Key.generate(recip_key.key.algorithm, ephemeral=True)
except AskarError:
raise CryptoServiceError("Error creating ephemeral key")
enc_key = ecdh.EcdhEs(alg_id, None, None).sender_wrap_key( # type: ignore
enc_key = ecdh.EcdhEs(alg_id, None, apv).sender_wrap_key( # type: ignore
wrap_alg, epk, recip_key.key, cek
)
builder.add_recipient(
JweRecipient(
encrypted_key=enc_key.ciphertext,
header={"kid": recip_key.kid, "epk": epk.get_jwk_public()},
header={
"kid": recip_key.kid,
"epk": json.loads(epk.get_jwk_public()),
},
)
)

builder.set_protected(
OrderedDict(
[
("typ", "application/didcomm-encrypted+json"),
("alg", alg_id),
("enc", enc_id),
("apv", b64url(apv)),
]
)
)
Expand All @@ -220,14 +186,13 @@ async def ecdh_es_encrypt(

async def ecdh_es_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
enc_message: Union[str, bytes],
recip_key: AskarSecretKey,
) -> bytes:
"""Decode a message from DIDComm v2 anonymous encryption."""
if isinstance(wrapper, bytes):
wrapper = wrapper.decode("utf-8")
if not isinstance(wrapper, JweEnvelope):
wrapper = JweEnvelope.from_json(wrapper)
if isinstance(enc_message, bytes):
wrapper = enc_message.decode("utf-8")
wrapper = JweEnvelope.from_json(enc_message)

alg_id = wrapper.protected.get("alg")

Expand Down Expand Up @@ -263,12 +228,10 @@ async def ecdh_es_decrypt(
except AskarError:
raise CryptoServiceError("Error loading ephemeral key")

apu = recip.header.get("apu")
apv = recip.header.get("apv")
# apu and apv are allowed to be None

try:
cek = ecdh.EcdhEs(alg_id, apu, apv).receiver_unwrap_key( # type: ignore
cek = ecdh.EcdhEs(
alg_id, None, wrapper.apv_bytes
).receiver_unwrap_key( # type: ignore
wrap_alg,
enc_alg,
epk,
Expand Down Expand Up @@ -318,7 +281,7 @@ async def ecdh_1pu_encrypt(
except AskarError:
raise CryptoServiceError("Error creating ephemeral key")

apu = b64url(sender_key.kid)
apu = sender_key.kid
apv = []
for recip_key in to_keys:
if agree_alg:
Expand All @@ -328,15 +291,15 @@ async def ecdh_1pu_encrypt(
agree_alg = recip_key.key.algorithm
apv.append(recip_key.kid)
apv.sort()
apv = b64url(hashlib.sha256((".".join(apv)).encode()).digest())
apv = hashlib.sha256((".".join(apv)).encode()).digest()

builder.set_protected(
OrderedDict(
[
("alg", alg_id),
("enc", enc_id),
("apu", apu),
("apv", apv),
("apu", b64url(apu)),
("apv", b64url(apv)),
("epk", json.loads(epk.get_jwk_public())),
("skid", sender_key.kid),
]
Expand All @@ -362,15 +325,14 @@ async def ecdh_1pu_encrypt(

async def ecdh_1pu_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
enc_message: Union[str, bytes],
recip_key: AskarSecretKey,
sender_key: AskarKey,
):
"""Decode a message from DIDComm v2 authenticated encryption."""
if isinstance(wrapper, bytes):
wrapper = wrapper.decode("utf-8")
if not isinstance(wrapper, JweEnvelope):
wrapper = JweEnvelope.from_json(wrapper)
if isinstance(enc_message, bytes):
wrapper = enc_message.decode("utf-8")
wrapper = JweEnvelope.from_json(enc_message)

alg_id = wrapper.protected.get("alg")
if alg_id and alg_id in ("ECDH-1PU+A128KW", "ECDH-1PU+A256KW"):
Expand All @@ -397,12 +359,10 @@ async def ecdh_1pu_decrypt(
except AskarError:
raise CryptoServiceError("Error loading ephemeral key")

apu = wrapper.protected.get("apu")
apv = wrapper.protected.get("apv")
# apu and apv are allowed to be None

try:
cek = ecdh.Ecdh1PU(alg_id, apu, apv).receiver_unwrap_key( # type: ignore
cek = ecdh.Ecdh1PU(
alg_id, wrapper.apu_bytes, wrapper.apv_bytes
).receiver_unwrap_key( # type: ignore
wrap_alg,
enc_alg,
epk,
Expand Down
Loading