diff --git a/src/aleph/network.py b/src/aleph/network.py index 5f0a55b62..ec79f5977 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -4,18 +4,15 @@ from typing import Coroutine, Dict, List from urllib.parse import unquote -from aleph_message.models import ItemType from p2pclient import Client as P2PClient from aleph.exceptions import InvalidMessageError from aleph.register_chain import VERIFIER_REGISTER +from aleph.schemas.pending_messages import parse_message from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel -from aleph.utils import get_sha256 -from aleph.utils import item_type_from_hash LOGGER = logging.getLogger("NETWORK") -MAX_INLINE_SIZE = 200000 # 200kb max inline content size. INCOMING_MESSAGE_AUTHORIZED_FIELDS = [ "item_hash", @@ -29,8 +26,6 @@ "signature", ] -HOST = None - async def incoming_check(ipfs_pubsub_message: Dict) -> Dict: """Verifies an incoming message is sane, protecting from spam in the @@ -71,57 +66,8 @@ async def check_message( TODO: Implement it fully! Dangerous! """ - if not isinstance(message, dict): - raise InvalidMessageError("Message must be a dict") - - if not message: - raise InvalidMessageError("Message must not be empty") - - for field in INCOMING_MESSAGE_AUTHORIZED_FIELDS: - if field not in message: - raise InvalidMessageError( - f"Missing field '{field}' in message {message['item_hash']}" - ) - - if not isinstance(message["item_hash"], str): - raise InvalidMessageError("Unknown hash %s" % message["item_hash"]) - - if not isinstance(message["chain"], str): - raise InvalidMessageError("Unknown chain %s" % message["chain"]) - - if message.get("channel", None) is not None: - if not isinstance(message.get("channel", None), str): - raise InvalidMessageError("Unknown channel %s" % message["channel"]) - - if not isinstance(message["sender"], str): - raise InvalidMessageError("Unknown sender %s" % message["sender"]) - - if not isinstance(message["signature"], str): - raise InvalidMessageError("Unknown signature %s" % message["signature"]) - if message.get("item_content", None) is not None: - if len(message["item_content"]) > MAX_INLINE_SIZE: - raise InvalidMessageError("Message too long") - await asyncio.sleep(0) - - if message.get("hash_type", "sha256") == "sha256": # leave the door open. - if not trusted: - item_hash = get_sha256(message["item_content"]) - # item_hash = await loop.run_in_executor(None, get_sha256, message['item_content']) - # item_hash = sha256(message['item_content'].encode('utf-8')).hexdigest() - - if message["item_hash"] != item_hash: - raise InvalidMessageError("Bad hash") - else: - raise InvalidMessageError("Unknown hash type %s" % message["hash_type"]) - - message["item_type"] = ItemType.inline.value - - else: - try: - message["item_type"] = item_type_from_hash(message["item_hash"]).value - except ValueError as error: - LOGGER.warning(error) + _ = parse_message(message) if trusted: # only in the case of a message programmatically built here diff --git a/src/aleph/schemas/__init__.py b/src/aleph/schemas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/aleph/schemas/pending_messages.py b/src/aleph/schemas/pending_messages.py new file mode 100644 index 000000000..06444a006 --- /dev/null +++ b/src/aleph/schemas/pending_messages.py @@ -0,0 +1,176 @@ +""" +Schemas to process raw messages coming from users to the Aleph network. +These schemas are used to parse messages coming from the network into +more practical Python objects. + +While extremely similar to the functionalities of the aleph message module +(of which we reuse some classes), this implementation differs in several +ways: +1. We do not expect the `content` key to be provided. At best, we get + an `item_content` field for inline type messages. Otherwise, + the content has to be fetched (and validated) later from the network. +2. We do not care for confirmations, as the message we are checking is + not even integrated yet. + +TODO: this module should reasonably be part of aleph message, if only + to make the schemas available for the validation of client data + in aleph-client. +""" + + +import json +from hashlib import sha256 +from typing import Any, Literal, Optional + +from aleph_message.models import ( + AggregateContent, + BaseContent, + BaseMessage, + Chain, + ForgetContent, + PostContent, + ProgramContent, + StoreContent, +) +from aleph_message.models import MessageType, ItemType +from pydantic import BaseModel, root_validator, validator + +from aleph.exceptions import InvalidMessageError +from aleph.utils import item_type_from_hash + +MAX_INLINE_SIZE = 200000 # 200kb max inline content size. + + +class BasePendingMessage(BaseModel): + """ + A raw Aleph message, as sent by users to the Aleph network. + """ + + sender: str + chain: Chain + signature: str + type: MessageType + item_content: Optional[str] + item_type: ItemType + item_hash: str + time: float + channel: Optional[str] = None + content: Optional[BaseContent] = None + + @root_validator(pre=True) + def load_content(cls, values): + """ + Preload inline content. We let the CCN populate this field later + on for ipfs and storage item types. + """ + + item_type = values["item_type"] + item_content = values.get("item_content") + if item_type == ItemType.inline: + if item_content is None: + raise ValueError("Item content not specified for inline item type") + + if len(item_content) > MAX_INLINE_SIZE: + raise ValueError("Message too long") + try: + values["content"] = json.loads(item_content) + except json.JSONDecodeError as e: + raise ValueError("Message content is not valid JSON data") from e + else: + if item_content is not None: + raise ValueError(f"{item_type} messages cannot define item_content") + + return values + + @root_validator() + def check_item_type(cls, values): + """ + Checks that the item hash of the message matches the one inferred from the hash. + Only applicable to storage/ipfs item types. + """ + item_type = ItemType(values["item_type"]) + if item_type == ItemType.inline: + return values + + expected_item_type = item_type_from_hash(values["item_hash"]) + if item_type != expected_item_type: + raise ValueError( + f"Expected {expected_item_type} based on hash but item type is {item_type}." + ) + return values + + @validator("item_hash") + def check_item_hash(cls, v, values): + """ + For inline item types, check that the item hash is equal to + the hash of the item content. + """ + + item_type = values["item_type"] + if item_type == ItemType.inline: + item_content: str = values["item_content"] + + computed_hash: str = sha256(item_content.encode()).hexdigest() + if v != computed_hash: + raise ValueError( + "'item_hash' do not match 'sha256(item_content)'" + f", expecting {computed_hash}" + ) + elif item_type == ItemType.ipfs: + # TODO: CHeck that the hash looks like an IPFS multihash + pass + else: + assert item_type == ItemType.storage + return v + + +class PendingAggregateMessage(BasePendingMessage): + type: Literal[MessageType.aggregate] # type: ignore + content: Optional[AggregateContent] = None + + +class PendingForgetMessage(BasePendingMessage): + type: Literal[MessageType.forget] # type: ignore + content: Optional[ForgetContent] = None + + +class PendingPostMessage(BasePendingMessage): + type: Literal[MessageType.post] # type: ignore + content: Optional[PostContent] = None + + +class PendingProgramMessage(BasePendingMessage): + type: Literal[MessageType.program] # type: ignore + content: Optional[ProgramContent] = None + + +class PendingStoreMessage(BasePendingMessage): + type: Literal[MessageType.store] # type: ignore + content: Optional[StoreContent] = None + + +MESSAGE_TYPE_TO_CLASS = { + MessageType.aggregate: PendingAggregateMessage, + MessageType.forget: PendingForgetMessage, + MessageType.post: PendingPostMessage, + MessageType.program: PendingProgramMessage, + MessageType.store: PendingStoreMessage, +} + + +def parse_message(message_dict: Any) -> BasePendingMessage: + if not isinstance(message_dict, dict): + raise InvalidMessageError("Message is not a dictionary") + + raw_message_type = message_dict.get("type") + try: + message_type = MessageType(raw_message_type) + except ValueError as e: + raise InvalidMessageError(f"Invalid message_type: '{raw_message_type}'") from e + + msg_cls = MESSAGE_TYPE_TO_CLASS[message_type] + + try: + return msg_cls(**message_dict) + except ValueError as e: + raise InvalidMessageError("Could not parse message") from e diff --git a/tests/schemas/test_pending_messages.py b/tests/schemas/test_pending_messages.py new file mode 100644 index 000000000..b651682b1 --- /dev/null +++ b/tests/schemas/test_pending_messages.py @@ -0,0 +1,183 @@ +import json +from typing import Dict + +import pytest + +from aleph.exceptions import InvalidMessageError +from aleph.schemas.pending_messages import ( + BasePendingMessage, + PendingAggregateMessage, + PendingForgetMessage, + PendingPostMessage, + PendingProgramMessage, + PendingStoreMessage, + parse_message, +) + + +def check_basic_message_fields(pending_message: BasePendingMessage, message_dict: Dict): + assert pending_message.chain == message_dict["chain"] + assert pending_message.item_hash == message_dict["item_hash"] + assert pending_message.sender == message_dict["sender"] + assert pending_message.type == message_dict["type"] + assert pending_message.channel == message_dict["channel"] + assert pending_message.signature == message_dict["signature"] + assert pending_message.channel == message_dict["channel"] + + +def test_parse_aggregate_inline_message(): + message_dict = { + "chain": "ETH", + "item_hash": "6127637a9415444e62843f62c81a9dda708363b3bb830a5b10fcc212cd586fa9", + "sender": "0x51A58800b26AA1451aaA803d1746687cB88E0501", + "type": "AGGREGATE", + "channel": "UNSLASHED", + "item_content": '{"address":"0x51A58800b26AA1451aaA803d1746687cB88E0501","key":"0x93463a4f3af42de28f6840e59de6111b4192cf8bbinance","content":{"1652716953956":{"version":"x25519-xsalsa20-poly1305","nonce":"eohzj0i+fiaduqOcRnKyHVoTN19Gdv1N","ephemPublicKey":"eAWtQ7A0qA1b/VpnuexR098LFzQhw4/wbneri+XuBgA=","ciphertext":"qsWudj1UrcC5qbdZgzks3h8OF+kOD/CpB5og7zZXNj3zDYoXA+0CLYErX8gsN7yuJQNd+MciEHoxfQZWKtulR9+RMxQrD7DnNUE4y0ick3aFKjXAcJLcbCKOXllo5p9hxq1o/VONJor4uiHc97UhRTK1RXQUNdKz+V+RknAPrlamDcv3LJopm9zdMoxw5hRYIpF3fKH5natLkvc7EzmeQ3Bvo3mUtcyHWZZqmWpmtjVNFf1I+OHgFz4SK4O8nLq2fDPW8EtJBwTTfKeUIhj9B34V4+gl4O+822e4Tnbi4sTFGREc2lSqwDME4u4qyYMaM7omFYcfVvLwBHtIOoSl21xtaPh7g5q8z9lqlWJQTE9ZE2z0kKUuDLPCcrLMm1ooc69pSKPn2W87ycAWyN1v88i5KIU8vIBraWlFY76ROlHScw1L05PKI+1S03demsgNaT+hNTxGwSBOuEq7P25aJjiL3eKXZZ+lZONtSbegxTfFBg==","sha256":"99b9660c915e6ba26fb52cc7ae2735e73b1aeebd167db762e6e4b3d6ec85235f"}},"time":1652716955.776}', + "item_type": "inline", + "signature": "0xb0b97e102f7f75091306ec3ac39639a9560783a1652ed9f1c79138cafc77b9a1519a5cccabf7eb64a8cf4de11394757d95f9d7218833f85010ee847b77716c201c", + "time": 1652716955.776, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingAggregateMessage) + + check_basic_message_fields(message, message_dict) + + assert message.content is not None + content = json.loads(message_dict["item_content"]) + assert message.content.address == content["address"] + assert message.content.time == content["time"] + assert message.content.key == content["key"] + assert message.content.content == content["content"] + + +def test_parse_post_message_storage_content(): + message_dict = { + "chain": "ETH", + "item_hash": "QmWx3j1gSQUrBkYnA8wiuhmE5wGuVNKv5wW6L7RHgLi4H4", + "sender": "0x06DE0C46884EbFF46558Cd1a9e7DA6B1c3E9D0a8", + "type": "POST", + "channel": None, + "item_content": None, + "item_type": "ipfs", + "signature": "0xa1d9fadcf5e6613f6929aa18720c216763a4c04d1462c6e10b81b37d8b2b7fd42618f7889fd2b29d4940d5cb68b6eb24243b51fa932dec6d96de9bbb7e64f91d1c", + "time": 1608297192.085, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingPostMessage) + + check_basic_message_fields(message, message_dict) + assert message.content is None + + +def test_parse_store_message_inline_content(): + message_dict = { + "chain": "NULS2", + "item_hash": "4bbcfe7c4775492c2e602d322d68f558891468927b5e0d6cb89ff880134f323e", + "sender": "NULSd6Hgbhr42Dm5nEgf6foEUT5bgwHesZQJB", + "type": "STORE", + "channel": "MYALEPH", + "item_content": '{"address":"NULSd6Hgbhr42Dm5nEgf6foEUT5bgwHesZQJB","item_type":"ipfs","item_hash":"QmUDS8mpQmpPyptyUEedHxHMkxo7ueRRiAvrpgvJMpjXwW","time":1577325086.513}', + "item_type": "inline", + "signature": "G7/xlWoMjjOr1NBN4SiZ8USYYVM9Q3JHXChR9hPw9/YSItfAplshWysqYDkvmBZiwbICG0IVB3ilMPJ/ZVgPNlk=", + "time": 1608297193.717, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingStoreMessage) + + check_basic_message_fields(message, message_dict) + + assert message.content is not None + content = json.loads(message_dict["item_content"]) + assert message.content.address == content["address"] + assert message.content.time == content["time"] + assert message.content.item_hash == content["item_hash"] + assert message.content.item_type == content["item_type"] + + +def test_parse_store_message_storage_content(): + message_dict = { + "chain": "ETH", + "item_hash": "30cc40533aa3ccf16a7c7c8a40da5633f64a83e4b89dcc7815f3a0af2149e1ac", + "sender": "0x7332eA1229c11C627C10eB24c1A6F77BceD1D5c1", + "type": "STORE", + "channel": "EVIDENZ", + "item_content": None, + "item_type": "storage", + "signature": "23d1d099dd111ae3251efea537f57767cf43b2ae3611bf9051760e0a9bc2bd4429563a130e3e391668086d101f8a197f55377f50b15d4c0303ff957d90a258a31b", + "time": 1616021679.055, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingStoreMessage) + + check_basic_message_fields(message, message_dict) + assert message.content is None + assert message.item_content is None + + +def test_parse_forget_message(): + message_dict = { + "chain": "ETH", + "item_hash": "884dd713e94fa0350239b67e65eecaa54361df8af0e3f6d0e42e0f8de059e15a", + "sender": "0xB68B9D4f3771c246233823ed1D3Add451055F9Ef", + "type": "FORGET", + "channel": "TEST", + "item_content": '{"address":"0xB68B9D4f3771c246233823ed1D3Add451055F9Ef","time":1639058312.376,"hashes":["e3b24727335e34016247c0d37e2b0203bb8c2d76deddafc1700b4cf0e13845c5"],"reason":"None"}', + "item_type": "inline", + "signature": "0x7dc7a45aab12d78367c085799d06ef2e98fce31f76ca06975ce570fe4d92008f66f307bf68ed3ca450d04d4e779776ca13a1e7851cb48915bd390389ae4afd1b1c", + "time": 1639058312.376, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingForgetMessage) + + assert message.content is not None + check_basic_message_fields(message, message_dict) + content = json.loads(message_dict["item_content"]) + assert message.content.address == content["address"] + assert message.content.time == content["time"] + assert message.content.hashes == content["hashes"] + + +def test_parse_program_message(): + message_dict = { + "chain": "ETH", + "item_hash": "2feafebd2dcc023851cbe461ba09000c6ea7ddf2db6dbb31ae8b627556382ba7", + "sender": "0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9", + "type": "PROGRAM", + "channel": "TEST", + "item_content": '{"address":"0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9","time":1627465647.9127016,"type":"vm-function","allow_amend":false,"code":{"encoding":"zip","entrypoint":"main:app","ref":"3631866c6237ff84c546e43b5679111b419c7044e0c367f357dbc7dd8ad21a5a","use_latest":true},"on":{"http":true},"environment":{"reproducible":false,"internet":true,"aleph_api":true,"shared_cache":false},"resources":{"vcpus":1,"memory":128,"seconds":30},"runtime":{"ref":"bd79839bf96e595a06da5ac0b6ba51dea6f7e2591bb913deccded04d831d29f4","use_latest":true,"comment":"Aleph Alpine Linux with Python 3.8"},"volumes":[]}', + "item_type": "inline", + "signature": "0x167b4558fd2f806bab7ef14d1f92723dd1616d5806075ba95e5ebbe4860a47b2613a2205c507525e8e5f8c7251e1a5c5963a12f7f2343e93c4b9b6e402fbb9bf1b", + "time": 1627465978.121, + } + + message = parse_message(message_dict) + assert isinstance(message, PendingProgramMessage) + + check_basic_message_fields(message, message_dict) + + assert message.content is not None + content = json.loads(message_dict["item_content"]) + assert message.content.address == content["address"] + assert message.content.time == content["time"] + assert message.content.code == content["code"] + assert message.content.type == content["type"] + + +def test_parse_none(): + with pytest.raises(InvalidMessageError): + _ = parse_message(None) + + +def test_parse_empty_dict(): + with pytest.raises(InvalidMessageError): + _ = parse_message({}) + + +def test_parse_storage_with_item_content(): + with pytest.raises(InvalidMessageError): + _ = parse_message({}) diff --git a/tests/test_network.py b/tests/test_network.py index 0cb1a52a6..4e28d1733 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -40,6 +40,7 @@ async def test_valid_message(): async def test_invalid_chain_message(): sample_message = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "BAR", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", @@ -55,6 +56,7 @@ async def test_invalid_chain_message(): async def test_invalid_signature_message(): sample_message = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "NULS", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw",