Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions bsv/aes_gcm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from Cryptodome.Cipher import AES
from Cryptodome.Util import Padding

class AESGCMError(Exception):
pass

def aes_gcm_encrypt(plaintext: bytes, key: bytes, iv: bytes, aad: bytes = b""):
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
cipher.update(aad)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
return ciphertext, tag

def aes_gcm_decrypt(ciphertext: bytes, key: bytes, iv: bytes, tag: bytes, aad: bytes = b""):
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
cipher.update(aad)
try:
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return plaintext
except ValueError as e:
raise AESGCMError(f"decryption failed: {e}")

# --- GHASH utilities (for test vector compatibility, optional) ---
def xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))

def right_shift(block: bytes) -> bytes:
b = bytearray(block)
carry = 0
for i in range(len(b)):
old_carry = carry
carry = b[i] & 0x01
b[i] >>= 1
if old_carry:
b[i] |= 0x80
return bytes(b)

def check_bit(block: bytes, index: int, bit: int) -> bool:
return ((block[index] >> bit) & 1) == 1

def multiply(block0: bytes, block1: bytes) -> bytes:
v = bytearray(block1)
z = bytearray(16)
r = bytearray([0xe1] + [0x00]*15)
for i in range(16):
for j in range(7, -1, -1):
if check_bit(block0, i, j):
z = bytearray(x ^ y for x, y in zip(z, v))
if check_bit(v, 15, 0):
v = bytearray(x ^ y for x, y in zip(right_shift(v), r))
else:
v = bytearray(right_shift(v))
return bytes(z)

def ghash(input_bytes: bytes, hash_subkey: bytes) -> bytes:
result = bytes(16)
for i in range(0, len(input_bytes), 16):
block = input_bytes[i:i+16]
if len(block) < 16:
block = block + b"\x00" * (16 - len(block))
result = multiply(xor_bytes(result, block), hash_subkey)
return result
378 changes: 378 additions & 0 deletions bsv/auth/clients/authhttp.py

Large diffs are not rendered by default.

186 changes: 96 additions & 90 deletions bsv/auth/master_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

Base64String = str
CertificateFieldNameUnder50Bytes = str

class MasterCertificate(Certificate):
def __init__(
self,
Expand Down Expand Up @@ -58,93 +57,67 @@ def create_certificate_fields(creator_wallet: Any, certifier_or_subject: Any, fi
return {'certificateFields': certificate_fields, 'masterKeyring': master_keyring}

@staticmethod
def issue_certificate_for_subject(
certifier_wallet: Any,
subject: Any,
fields: Dict[CertificateFieldNameUnder50Bytes, str],
certificate_type: str,
get_revocation_outpoint: Optional[Callable[[str], Any]] = None,
serial_number: Optional[str] = None
) -> 'MasterCertificate':
if serial_number is not None:
final_serial_number = serial_number
else:
final_serial_number = base64.b64encode(os.urandom(32)).decode('utf-8')
field_result = MasterCertificate.create_certificate_fields(certifier_wallet, subject, fields)
certificate_fields = field_result['certificateFields']
master_keyring = field_result['masterKeyring']
if get_revocation_outpoint is not None:
revocation_outpoint = get_revocation_outpoint(final_serial_number)
else:
revocation_outpoint = None
# 1) Certifier public key resolution via wallet interface if available
certifier_pubkey = None
def _resolve_public_key(wallet: Any, fallback: Any = None) -> Any:
"""
Resolve the public key from the wallet. If it fails, return the fallback.
"""
from bsv.keys import PublicKey
pubkey = None
try:
# Prefer WalletInterface.get_public_key with identityKey=True
get_pk_args = {"identityKey": True}
# Some wallet interfaces accept seekPermission; keep it False by default
res = certifier_wallet.get_public_key(None, get_pk_args, "auth-master-cert")
res = wallet.get_public_key(None, get_pk_args, "auth-master-cert")
if isinstance(res, dict):
pk_bytes_or_hex = res.get("publicKey")
if pk_bytes_or_hex:
from bsv.keys import PublicKey
certifier_pubkey = PublicKey(pk_bytes_or_hex)
pubkey = PublicKey(pk_bytes_or_hex)
except Exception:
certifier_pubkey = None

# Fallbacks: try common attributes exposed by simple wallets
if certifier_pubkey is None:
pubkey = None
if pubkey is None:
try:
# e.g. WalletImpl exposes .public_key
certifier_pubkey = getattr(certifier_wallet, "public_key", None)
pubkey = getattr(wallet, "public_key", None)
except Exception:
certifier_pubkey = None
if certifier_pubkey is None:
raise ValueError("Unable to resolve certifier public key from wallet")
pubkey = None
if pubkey is None and fallback is not None:
pubkey = fallback
return pubkey

# 1b) Resolve subject public key
@staticmethod
def _resolve_subject_public_key(subject: Any, certifier_pubkey: Any) -> Any:
from bsv.keys import PublicKey
subject_pubkey = None
# Dict-like counterparty: {"type": <int>, "counterparty": <hex/bytes>}
if isinstance(subject, dict):
try:
stype = subject.get("type")
if stype in (0, 2): # self / anyone
subject_pubkey = certifier_pubkey
else:
cp = subject.get("counterparty")
if cp is not None:
subject_pubkey = PublicKey(cp)
except Exception:
subject_pubkey = None
# Already a PublicKey
if subject_pubkey is None and isinstance(subject, PublicKey):
subject_pubkey = subject
# Bytes/hex string
if subject_pubkey is None and isinstance(subject, (bytes, bytearray, str)):

# If already a PublicKey instance
if isinstance(subject, PublicKey):
return subject

# If provided as bytes/bytearray/hex string
if isinstance(subject, (bytes, bytearray, str)):
try:
subject_pubkey = PublicKey(subject)
return PublicKey(subject)
except Exception:
subject_pubkey = None
# Fallbacks: treat as self if still unresolved
if subject_pubkey is None:
subject_pubkey = certifier_pubkey
return certifier_pubkey

# 2) Construct unsigned MasterCertificate
cert = MasterCertificate(
certificate_type,
final_serial_number,
subject_pubkey,
certifier_pubkey,
revocation_outpoint,
certificate_fields,
signature=None,
master_keyring=master_keyring,
)
# If provided as a dict descriptor
if isinstance(subject, dict):
stype = subject.get("type")
if stype in (0, 2): # self / anyone
return certifier_pubkey
cp = subject.get("counterparty")
if cp is not None:
try:
return PublicKey(cp)
except Exception:
pass
return certifier_pubkey

# Fallback
return certifier_pubkey

# 3) Sign using wallet interface if available; fallback to direct private key
@staticmethod
def _sign_certificate(cert: 'MasterCertificate', certifier_wallet: Any, certificate_type: str, final_serial_number: str) -> Optional[bytes]:
"""
Attach a signature to the certificate. Prefer the wallet interface; otherwise use the private_key attribute.
"""
try:
# Use wallet wire compatible signing first
data_to_sign = cert.to_binary(include_signature=False)
sig_args = {
'encryption_args': {
Expand All @@ -153,7 +126,6 @@ def issue_certificate_for_subject(
'protocol': 'certificate signature',
},
'key_id': f"{certificate_type} {final_serial_number}",
# Anyone
'counterparty': {'type': 2},
},
'data': data_to_sign,
Expand All @@ -164,16 +136,49 @@ def issue_certificate_for_subject(
except Exception:
sig_res = None
if isinstance(sig_res, dict) and sig_res.get('signature'):
cert.signature = sig_res['signature']
return sig_res['signature']
else:
# Fallback: direct private key if exposed
priv = getattr(certifier_wallet, "private_key", None)
if priv is not None:
# sign mutates the certificate; ensure we return bytes for callers
cert.sign(priv)
return cert.signature
except Exception:
# Leave unsigned; caller may sign later using their own mechanism
pass
return None

@staticmethod
def issue_certificate_for_subject(
certifier_wallet: Any,
subject: Any,
fields: Dict[CertificateFieldNameUnder50Bytes, str],
certificate_type: str,
get_revocation_outpoint: Optional[Callable[[str], Any]] = None,
serial_number: Optional[str] = None
) -> 'MasterCertificate':
final_serial_number = serial_number or base64.b64encode(os.urandom(32)).decode('utf-8')
field_result = MasterCertificate.create_certificate_fields(certifier_wallet, subject, fields)
certificate_fields = field_result['certificateFields']
master_keyring = field_result['masterKeyring']
revocation_outpoint = get_revocation_outpoint(final_serial_number) if get_revocation_outpoint else None

certifier_pubkey = MasterCertificate._resolve_public_key(certifier_wallet)
if certifier_pubkey is None:
raise ValueError("Unable to resolve certifier public key from wallet")
subject_pubkey = MasterCertificate._resolve_subject_public_key(subject, certifier_pubkey)

cert = MasterCertificate(
certificate_type,
final_serial_number,
subject_pubkey,
certifier_pubkey,
revocation_outpoint,
certificate_fields,
signature=None,
master_keyring=master_keyring,
)

cert.signature = MasterCertificate._sign_certificate(cert, certifier_wallet, certificate_type, final_serial_number)
return cert

@staticmethod
Expand All @@ -187,8 +192,9 @@ def decrypt_field(
privileged_reason: Optional[str] = None
) -> Dict[str, Any]:
"""
master_keyringからfield_nameの対称鍵をbase64デコード→wallet.decryptで復号→encrypted_field_valueをbase64デコード→対称鍵でAES-GCM復号
戻り値: { 'fieldRevelationKey': bytes, 'decryptedFieldValue': str }
Base64-decode the symmetric key for the given field_name from the master_keyring, decrypt it via wallet.decrypt,
base64-decode the encrypted_field_value, then decrypt it with the symmetric key using AES-GCM.
Returns: { 'fieldRevelationKey': bytes, 'decryptedFieldValue': str }
"""
if field_name not in master_keyring:
raise ValueError(f"Field '{field_name}' not found in master_keyring.")
Expand All @@ -205,10 +211,10 @@ def decrypt_field(
},
"ciphertext": encrypted_key_bytes,
}
# 対称鍵の復号(wallet.decrypt
# Decrypt the symmetric key (wallet.decrypt)
decrypt_result = subject_or_certifier_wallet.decrypt(None, decrypt_args)
if not decrypt_result or 'plaintext' not in decrypt_result:
raise NotImplementedError("wallet.decryptの実装が必要です")
raise NotImplementedError("wallet.decrypt implementation is required")
field_revelation_key = decrypt_result['plaintext']
encrypted_field_bytes = base64.b64decode(encrypted_field_value)
decrypted_field_bytes = EncryptedMessage.aes_gcm_decrypt(field_revelation_key, encrypted_field_bytes)
Expand All @@ -227,8 +233,8 @@ def decrypt_fields(
privileged_reason: Optional[str] = None
) -> Dict[CertificateFieldNameUnder50Bytes, str]:
"""
fieldsの各フィールドに対してdecrypt_fieldを呼び出し、結果を集約
戻り値: { field_name: decrypted_value }
Invoke decrypt_field for each entry in fields and aggregate the results.
Returns: { field_name: decrypted_value }
"""
decrypted_fields: Dict[CertificateFieldNameUnder50Bytes, str] = {}
for field_name, encrypted_field_value in fields.items():
Expand Down Expand Up @@ -257,17 +263,17 @@ def create_keyring_for_verifier(
privileged_reason: Optional[str] = None
) -> Dict[CertificateFieldNameUnder50Bytes, Base64String]:
"""
fields_to_revealで指定された各フィールドについて:
1. master_keyringから対称鍵を復号(decrypt_fieldを利用)
2. subject_wallet.encryptでverifier用に再暗号化(serial_numberをkey_idに含める)
3. 結果をBase64でkeyringに格納
返り値: { field_name: encrypted_key_for_verifier }
For each field specified in fields_to_reveal:
1. Decrypt the symmetric key from the master_keyring (using decrypt_field)
2. Re-encrypt it with subject_wallet.encrypt for the verifier (include serial_number in key_id)
3. Store the result in the keyring as Base64
Returns: { field_name: encrypted_key_for_verifier }
"""
keyring_for_verifier: Dict[CertificateFieldNameUnder50Bytes, Base64String] = {}
for field_name in fields_to_reveal:
if field_name not in fields:
raise ValueError(f"Field '{field_name}' not found in certificate fields.")
# 1. master_keyringから対称鍵を復号
# 1. Decrypt the symmetric key from the master_keyring
decrypt_result = MasterCertificate.decrypt_field(
subject_wallet,
master_keyring,
Expand All @@ -278,7 +284,7 @@ def create_keyring_for_verifier(
privileged_reason
)
field_revelation_key = decrypt_result['fieldRevelationKey']
# 2. subject_wallet.encryptでverifier用に再暗号化
# 2. Re-encrypt for the verifier with subject_wallet.encrypt
protocol_id, key_id = get_certificate_encryption_details(field_name, serial_number)
encrypt_args = {
"encryption_args": {
Expand Down
Loading