Skip to content

Commit

Permalink
expand EIP712AuthorityCertificate; more tests (#1589)
Browse files Browse the repository at this point in the history
expand EIP712AuthorityCertificate; more tests
  • Loading branch information
oberstet committed Aug 11, 2022
1 parent 7d775af commit d337467
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 127 deletions.
2 changes: 1 addition & 1 deletion autobahn/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
#
###############################################################################

__version__ = '22.7.1'
__version__ = '22.8.1.dev1'

__build__ = '00000000-0000000'
8 changes: 4 additions & 4 deletions autobahn/xbr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@
from autobahn.xbr._schema import FbsSchema, FbsObject, FbsType, FbsRPCCall, FbsEnum, FbsService, FbsEnumValue, \
FbsAttribute, FbsField, FbsRepository # noqa
from autobahn.xbr._wallet import stretch_argon2_secret, expand_argon2_secret, pkm_from_argon2_secret # noqa

HAS_XBR = True

from autobahn.xbr._frealm import FederatedRealm, Seeder # noqa
from autobahn.xbr._secmod import EthereumKey # noqa
from autobahn.xbr._secmod import EthereumKey, SecurityModuleMemory # noqa
from autobahn.xbr._userkey import UserKey # noqa

HAS_XBR = True

if not hasattr(abi, 'collapse_type'):

def collapse_type(base, sub, arrlist):
Expand Down Expand Up @@ -414,6 +413,7 @@ def account_from_ethkey(ethkey: bytes) -> eth_account.account.Account:
'FederatedRealm',
'Seeder',
'EthereumKey',
'SecurityModuleMemory',
)

except (ImportError, FileNotFoundError) as e:
Expand Down
210 changes: 184 additions & 26 deletions autobahn/xbr/_eip712_authority_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@
# THE SOFTWARE.
#
###############################################################################

import os.path
import pprint
from binascii import a2b_hex
from typing import Dict, Any, Optional, List

import web3
import cbor2

from py_eth_sig_utils.eip712 import encode_typed_data

from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH
from autobahn.xbr._secmod import EthereumKey

from ._eip712_base import sign, recover, is_chain_id, is_address, is_block_number, is_signature, is_eth_privkey
from ._eip712_certificate import EIP712Certificate
Expand Down Expand Up @@ -198,6 +206,7 @@ class EIP712AuthorityCertificate(EIP712Certificate):
CAPABILITY_CONSUMER = 32

__slots__ = (
# EIP712 attributes
'chainId',
'verifyingContract',
'validFrom',
Expand All @@ -206,16 +215,75 @@ class EIP712AuthorityCertificate(EIP712Certificate):
'realm',
'capabilities',
'meta',

# additional attributes
'signatures',
'hash',
)

def __init__(self, chainId: int, verifyingContract: bytes, validFrom: int, issuer: bytes, subject: bytes,
realm: bytes, capabilities: int, meta: str):
realm: bytes, capabilities: int, meta: str,
signatures: Optional[List[bytes]] = None):
super().__init__(chainId, verifyingContract, validFrom)
self.issuer = issuer
self.subject = subject
self.realm = realm
self.capabilities = capabilities
self.meta = meta
self.signatures = signatures
eip712 = create_eip712_authority_certificate(chainId,
verifyingContract,
validFrom,
issuer,
subject,
realm,
capabilities,
meta)
self.hash = encode_typed_data(eip712)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, self.__class__):
return False
if not EIP712AuthorityCertificate.__eq__(self, other):
return False
if other.chainId != self.chainId:
return False
if other.verifyingContract != self.verifyingContract:
return False
if other.validFrom != self.validFrom:
return False
if other.issuer != self.issuer:
return False
if other.subject != self.subject:
return False
if other.realm != self.realm:
return False
if other.capabilities != self.capabilities:
return False
if other.meta != self.meta:
return False
if other.signatures != self.signatures:
return False
if other.hash != self.hash:
return False
return True

def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)

def __str__(self) -> str:
return pprint.pformat(self.marshal())

def sign(self, key: EthereumKey, binary: bool = False) -> bytes:
eip712 = create_eip712_authority_certificate(self.chainId,
self.verifyingContract,
self.validFrom,
self.issuer,
self.subject,
self.realm,
self.capabilities,
self.meta)
return key.sign_typed_data(eip712, binary=binary)

def recover(self, signature: bytes) -> bytes:
return recover_eip712_authority_certificate(self.chainId,
Expand All @@ -228,15 +296,45 @@ def recover(self, signature: bytes) -> bytes:
self.meta,
signature)

def marshal(self, binary: bool = False) -> Dict[str, Any]:
obj = create_eip712_authority_certificate(chainId=self.chainId,
verifyingContract=self.verifyingContract,
validFrom=self.validFrom,
issuer=self.issuer,
subject=self.subject,
realm=self.realm,
capabilities=self.capabilities,
meta=self.meta)
if not binary:
obj['message']['verifyingContract'] = web3.Web3.toChecksumAddress(obj['message']['verifyingContract']) if obj['message']['verifyingContract'] else None
obj['message']['issuer'] = web3.Web3.toChecksumAddress(obj['message']['issuer']) if obj['message']['issuer'] else None
obj['message']['subject'] = web3.Web3.toChecksumAddress(obj['message']['subject']) if obj['message']['subject'] else None
obj['message']['realm'] = web3.Web3.toChecksumAddress(obj['message']['realm']) if obj['message']['realm'] else None
return obj

@staticmethod
def parse(data) -> 'EIP712AuthorityCertificate':
def parse(obj, binary: bool = False) -> 'EIP712AuthorityCertificate':
if type(obj) != dict:
raise ValueError('invalid type {} for object in EIP712AuthorityCertificate.parse'.format(type(obj)))

primaryType = obj.get('primaryType', None)
if primaryType != 'EIP712AuthorityCertificate':
raise ValueError('invalid primaryType "{}" - expected "EIP712AuthorityCertificate"'.format(primaryType))

# FIXME: check EIP712 types, domain

data = obj.get('message', None)
if type(data) != dict:
raise ValueError('invalid type {} for EIP712AuthorityCertificate'.format(type(data)))
for k in data:
if k not in ['chainId', 'verifyingContract', 'validFrom', 'issuer', 'subject',
if k not in ['type', 'chainId', 'verifyingContract', 'validFrom', 'issuer', 'subject',
'realm', 'capabilities', 'meta']:
raise ValueError('invalid attribute "{}" in EIP712AuthorityCertificate'.format(k))

_type = data.get('type', None)
if _type and _type != 'EIP712AuthorityCertificate':
raise ValueError('unexpected type "{}" in EIP712AuthorityCertificate'.format(_type))

chainId = data.get('chainId', None)
if chainId is None:
raise ValueError('missing chainId in EIP712AuthorityCertificate')
Expand All @@ -246,13 +344,20 @@ def parse(data) -> 'EIP712AuthorityCertificate':
verifyingContract = data.get('verifyingContract', None)
if verifyingContract is None:
raise ValueError('missing verifyingContract in EIP712AuthorityCertificate')
if type(verifyingContract) != str:
raise ValueError(
'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract)))
if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract):
raise ValueError(
'invalid value "{}" for verifyingContract in EIP712AuthorityCertificate'.format(verifyingContract))
verifyingContract = a2b_hex(verifyingContract[2:])
if binary:
if type(verifyingContract) != bytes:
raise ValueError(
'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract)))
if len(verifyingContract) != 20:
raise ValueError('invalid value length {} of verifyingContract'.format(len(verifyingContract)))
else:
if type(verifyingContract) != str:
raise ValueError(
'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract)))
if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract):
raise ValueError(
'invalid value "{}" for verifyingContract in EIP712AuthorityCertificate'.format(verifyingContract))
verifyingContract = a2b_hex(verifyingContract[2:])

validFrom = data.get('validFrom', None)
if validFrom is None:
Expand All @@ -263,29 +368,50 @@ def parse(data) -> 'EIP712AuthorityCertificate':
issuer = data.get('issuer', None)
if issuer is None:
raise ValueError('missing issuer in EIP712AuthorityCertificate')
if type(issuer) != str:
raise ValueError('invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer)))
if not _URI_PAT_REALM_NAME_ETH.match(issuer):
raise ValueError('invalid value "{}" for issuer in EIP712AuthorityCertificate'.format(issuer))
issuer = a2b_hex(issuer[2:])
if binary:
if type(issuer) != bytes:
raise ValueError(
'invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer)))
if len(issuer) != 20:
raise ValueError('invalid value length {} of issuer'.format(len(issuer)))
else:
if type(issuer) != str:
raise ValueError('invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer)))
if not _URI_PAT_REALM_NAME_ETH.match(issuer):
raise ValueError('invalid value "{}" for issuer in EIP712AuthorityCertificate'.format(issuer))
issuer = a2b_hex(issuer[2:])

subject = data.get('subject', None)
if subject is None:
raise ValueError('missing subject in EIP712AuthorityCertificate')
if type(subject) != str:
raise ValueError('invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject)))
if not _URI_PAT_REALM_NAME_ETH.match(subject):
raise ValueError('invalid value "{}" for subject in EIP712AuthorityCertificate'.format(subject))
subject = a2b_hex(subject[2:])
if binary:
if type(subject) != bytes:
raise ValueError(
'invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject)))
if len(subject) != 20:
raise ValueError('invalid value length {} of verifyingContract'.format(len(subject)))
else:
if type(subject) != str:
raise ValueError('invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject)))
if not _URI_PAT_REALM_NAME_ETH.match(subject):
raise ValueError('invalid value "{}" for subject in EIP712AuthorityCertificate'.format(subject))
subject = a2b_hex(subject[2:])

realm = data.get('realm', None)
if realm is None:
raise ValueError('missing realm in EIP712AuthorityCertificate')
if type(realm) != str:
raise ValueError('invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm)))
if not _URI_PAT_REALM_NAME_ETH.match(realm):
raise ValueError('invalid value "{}" for realm in EIP712AuthorityCertificate'.format(realm))
realm = a2b_hex(realm[2:])
if binary:
if type(realm) != bytes:
raise ValueError(
'invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm)))
if len(realm) != 20:
raise ValueError('invalid value length {} of realm'.format(len(realm)))
else:
if type(realm) != str:
raise ValueError('invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm)))
if not _URI_PAT_REALM_NAME_ETH.match(realm):
raise ValueError('invalid value "{}" for realm in EIP712AuthorityCertificate'.format(realm))
realm = a2b_hex(realm[2:])

capabilities = data.get('capabilities', None)
if capabilities is None:
Expand All @@ -302,3 +428,35 @@ def parse(data) -> 'EIP712AuthorityCertificate':
obj = EIP712AuthorityCertificate(chainId=chainId, verifyingContract=verifyingContract, validFrom=validFrom,
issuer=issuer, subject=subject, realm=realm, capabilities=capabilities, meta=meta)
return obj

def save(self, filename: str) -> int:
"""
Save certificate to file. File format (serialized as CBOR):
[cert_hash: bytes, cert_eip712: Dict[str, Any], cert_signatures: List[bytes]]
:param filename:
:return:
"""
cert_obj = [self.hash, self.marshal(binary=True), self.signatures or []]
with open(filename, 'wb') as f:
data = cbor2.dumps(cert_obj)
f.write(data)
return len(data)

@staticmethod
def load(filename) -> 'EIP712AuthorityCertificate':
"""
Load certificate from file.
:param filename:
:return:
"""
if not os.path.isfile(filename):
raise RuntimeError('cannot create EIP712AuthorityCertificate from filename "{}": not a file'.format(filename))
with open(filename, 'rb') as f:
cert_hash, cert_eip712, cert_signatures = cbor2.loads(f.read())
cert = EIP712AuthorityCertificate.parse(cert_eip712, binary=True)
assert cert_hash == cert.hash
cert.signatures = cert_signatures
return cert
17 changes: 17 additions & 0 deletions autobahn/xbr/_eip712_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@
_EIP712_SIG_LEN = 32 + 32 + 1


def _hash(data) -> bytes:
"""
keccak256(abi.encode(
EIP712_MEMBER_REGISTER_TYPEHASH,
obj.chainId,
obj.verifyingContract,
obj.member,
obj.registered,
keccak256(bytes(obj.eula)),
keccak256(bytes(obj.profile))
));
:param data:
:return:
"""


def sign(eth_privkey: bytes, data: Dict[str, Any]) -> bytes:
"""
Sign the given data using the given Ethereum private key.
Expand Down

0 comments on commit d337467

Please sign in to comment.