From 6ff38302a6ed6bd4a393019ad8762308666ae829 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 17 May 2022 19:19:07 +0200 Subject: [PATCH] [Messages] Use pending message model everywhere Propagated the use of the `BasePendingMessage` subclasses in all functions related to message processing. Split the message validation function, check_message, in two parts: * `parse_message` checks that the message is semantically valid, i.e. that all the required fields are present, are of the correct type and have sensible values. * `verify_signature` checks the signature of the message using the public key of the sender. We now call parse_message as early as possible in the process. This allows to simplify hypotheses about the presence/absence of specific fields and allows to simplify the codebase. One of the current issues is that the content of the message can only be loaded conditionally (if the message has inline content). Otherwise, we must load the content from the network. As this operation goes beyond simple validation, we perform it later in the flow, leading to a situation where the content of a message is not guaranteed to be available at a given point depending on the `item_type` field of the message. Modified tests to use the raw message classes. Removed a few tests that tested corner cases linked to incomplete message dictionaries. --- src/aleph/chains/common.py | 76 ++++++------ src/aleph/handlers/forget.py | 13 +-- src/aleph/handlers/storage.py | 5 +- src/aleph/jobs/process_pending_messages.py | 5 +- src/aleph/jobs/process_pending_txs.py | 14 +-- src/aleph/network.py | 71 ++++-------- src/aleph/permissions.py | 15 +-- src/aleph/services/ipfs/pubsub.py | 4 +- src/aleph/services/p2p/protocol.py | 4 +- src/aleph/storage.py | 24 ++-- tests/chains/test_common.py | 10 +- tests/chains/test_confirmation.py | 35 +++--- .../test_check_sender_authorization.py | 69 +++++------ .../storage/forget/test_forget_multi_users.py | 23 ++-- tests/storage/test_get_content.py | 79 ++++--------- tests/storage/test_store_message.py | 28 +++-- tests/test_network.py | 108 +++++++----------- 17 files changed, 249 insertions(+), 334 deletions(-) diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 1f5885c0e..d8b7c7c73 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -5,7 +5,6 @@ from enum import IntEnum from typing import Dict, Optional, Tuple, List -from aleph_message.models import MessageType from bson import ObjectId from pymongo import UpdateOne @@ -23,11 +22,11 @@ from aleph.model.filepin import PermanentPin from aleph.model.messages import CappedMessage, Message from aleph.model.pending import PendingMessage, PendingTX -from aleph.network import check_message as check_message_fn +from aleph.network import verify_signature from aleph.permissions import check_sender_authorization from aleph.storage import get_json, pin_hash, add_json, get_message_content from .tx_context import TxContext -from ..schemas.pending_messages import BasePendingMessage +from ..schemas.pending_messages import BasePendingMessage, PendingForgetMessage, PendingStoreMessage LOGGER = logging.getLogger("chains.common") @@ -53,12 +52,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height): } -async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None): +async def delayed_incoming( + message: BasePendingMessage, + chain_name: Optional[str] = None, + tx_hash: Optional[str] = None, + height: Optional[int] = None, +): if message is None: return await PendingMessage.collection.insert_one( { - "message": message, + "message": message.dict(exclude={"content"}), "source": dict( chain_name=chain_name, tx_hash=tx_hash, @@ -76,7 +80,7 @@ class IncomingStatus(IntEnum): async def mark_message_for_retry( - message: Dict, + message: BasePendingMessage, chain_name: Optional[str], tx_hash: Optional[str], height: Optional[int], @@ -84,10 +88,12 @@ async def mark_message_for_retry( retrying: bool, existing_id, ): + message_dict = message.dict(exclude={"content"}) + if not retrying: await PendingMessage.collection.insert_one( { - "message": message, + "message": message_dict, "source": dict( chain_name=chain_name, tx_hash=tx_hash, @@ -105,7 +111,7 @@ async def mark_message_for_retry( async def incoming( - message: Dict, + message: BasePendingMessage, chain_name: Optional[str] = None, tx_hash: Optional[str] = None, height: Optional[int] = None, @@ -120,8 +126,8 @@ async def incoming( if existing in database, created if not. """ - item_hash = message["item_hash"] - sender = message["sender"] + item_hash = message.item_hash + sender = message.sender ids_key = (item_hash, sender, chain_name) if chain_name and tx_hash and height and seen_ids is not None: @@ -131,9 +137,9 @@ async def incoming( filters = { "item_hash": item_hash, - "chain": message["chain"], - "sender": message["sender"], - "type": message["type"], + "chain": message.chain, + "sender": message.sender, + "type": message.type, } existing = await Message.collection.find_one( filters, @@ -141,40 +147,28 @@ async def incoming( ) if check_message: - if existing is None or (existing["signature"] != message["signature"]): + if existing is None or (existing["signature"] != message.signature): # check/sanitize the message if needed try: - message = await check_message_fn( - message, from_chain=(chain_name is not None) - ) + await verify_signature(message) except InvalidMessageError: return IncomingStatus.FAILED_PERMANENTLY, [] - if message is None: - return IncomingStatus.MESSAGE_HANDLED, [] - if retrying: LOGGER.debug("(Re)trying %s." % item_hash) else: LOGGER.info("Incoming %s." % item_hash) - # we set the incoming chain as default for signature - message["chain"] = message.get("chain", chain_name) - - # if existing is None: - # # TODO: verify if search key is ok. do we need an unique key for messages? - # existing = await Message.collection.find_one( - # filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1}) + updates: Dict[str, Dict] if chain_name and tx_hash and height: # We are getting a confirmation here new_values = await mark_confirmed_data(chain_name, tx_hash, height) - updates = { "$set": { "confirmed": True, }, - "$min": {"time": message["time"]}, + "$min": {"time": message.time}, "$addToSet": {"confirmations": new_values["confirmations"][0]}, } else: @@ -182,10 +176,10 @@ async def incoming( "$max": { "confirmed": False, }, - "$min": {"time": message["time"]}, + "$min": {"time": message.time}, + "$set": {}, } - # new_values = {'confirmed': False} # this should be our default. should_commit = False if existing: if seen_ids is not None and height is not None: @@ -237,19 +231,19 @@ async def incoming( json_content = content.value if json_content.get("address", None) is None: - json_content["address"] = message["sender"] + json_content["address"] = message.sender if json_content.get("time", None) is None: - json_content["time"] = message["time"] + json_content["time"] = message.time # warning: those handlers can modify message and content in place # and return a status. None has to be retried, -1 is discarded, True is # handled and kept. # TODO: change this, it's messy. try: - if message["type"] == MessageType.store: + if isinstance(message, PendingStoreMessage): handling_result = await handle_new_storage(message, json_content) - elif message["type"] == MessageType.forget: + elif isinstance(message, PendingForgetMessage): # Handling it here means that there we ensure that the message # has been forgotten before it is saved on the node. # We may want the opposite instead: ensure that the message has @@ -298,14 +292,14 @@ async def incoming( seen_ids[ids_key] = height LOGGER.debug("New message to store for %s." % item_hash) - # message.update(new_values) + updates["$set"] = { "content": json_content, "size": len(content.raw_value), - "item_content": message.get("item_content"), - "item_type": message.get("item_type"), - "channel": message.get("channel"), - "signature": message.get("signature"), + "item_content": message.item_content, + "item_type": message.item_type.value, + "channel": message.channel, + "signature": message.signature, **updates.get("$set", {}), } should_commit = True @@ -324,7 +318,7 @@ async def incoming( return IncomingStatus.MESSAGE_HANDLED, [] -async def process_one_message(message: Dict, *args, **kwargs): +async def process_one_message(message: BasePendingMessage, *args, **kwargs): """ Helper function to process a message on the spot. """ diff --git a/src/aleph/handlers/forget.py b/src/aleph/handlers/forget.py index 5daa9faaa..31e5ef627 100644 --- a/src/aleph/handlers/forget.py +++ b/src/aleph/handlers/forget.py @@ -6,12 +6,12 @@ from aioipfs.api import RepoAPI from aioipfs.exceptions import NotPinnedError -from aleph_message.models import ForgetMessage, MessageType -from aleph_message.models import ItemType +from aleph_message.models import ItemType, MessageType, ForgetContent from aleph.model.filepin import PermanentPin from aleph.model.hashes import delete_value from aleph.model.messages import Message +from aleph.schemas.pending_messages import PendingForgetMessage from aleph.services.ipfs.common import get_ipfs_api from aleph.utils import item_type_from_hash @@ -118,7 +118,7 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType): async def is_allowed_to_forget( - target_info: TargetMessageInfo, by: ForgetMessage + target_info: TargetMessageInfo, by: PendingForgetMessage ) -> bool: """Check if a forget message is allowed to 'forget' the target message given its hash.""" # Both senders are identical: @@ -136,7 +136,7 @@ async def is_allowed_to_forget( async def forget_if_allowed( - target_info: TargetMessageInfo, forget_message: ForgetMessage + target_info: TargetMessageInfo, forget_message: PendingForgetMessage ) -> None: """Forget a message. @@ -210,10 +210,9 @@ async def get_target_message_info(target_hash: str) -> Optional[TargetMessageInf return TargetMessageInfo.from_db_object(message_dict) -async def handle_forget_message(message: Dict, content: Dict): +async def handle_forget_message(forget_message: PendingForgetMessage, content: Dict): # Parsing and validation - forget_message = ForgetMessage(**message, content=content) - logger.debug(f"Handling forget message {forget_message.item_hash}") + forget_message.content = ForgetContent(**content) for target_hash in forget_message.content.hashes: target_info = await get_target_message_info(target_hash) diff --git a/src/aleph/handlers/storage.py b/src/aleph/handlers/storage.py index 89f938aa7..bf07af6be 100644 --- a/src/aleph/handlers/storage.py +++ b/src/aleph/handlers/storage.py @@ -19,6 +19,7 @@ from aleph.config import get_config from aleph.exceptions import AlephStorageException, UnknownHashError +from aleph.schemas.pending_messages import PendingStoreMessage from aleph.services.ipfs.common import get_ipfs_api from aleph.storage import get_hash_content from aleph.utils import item_type_from_hash @@ -26,7 +27,7 @@ LOGGER = logging.getLogger("HANDLERS.STORAGE") -async def handle_new_storage(message: Dict, content: Dict): +async def handle_new_storage(message: PendingStoreMessage, content: Dict): config = get_config() if not config.storage.store_files.value: return True # Ignore @@ -34,7 +35,7 @@ async def handle_new_storage(message: Dict, content: Dict): # TODO: ideally the content should be transformed earlier, but this requires more clean up # (ex: no more in place modification of content, simplification of the flow) try: - store_message = StoreMessage(**message, content=content) + store_message = StoreMessage(**message.dict(exclude={"content"}), content=content) except ValidationError as e: print(e) return -1 # Invalid store message, discard diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 3fbfd779d..d2640865d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -48,9 +49,11 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: + message = parse_message(pending["message"]) + async with sem: status, operations = await incoming( - pending["message"], + message=message, chain_name=pending["source"].get("chain_name"), tx_hash=pending["source"].get("tx_hash"), height=pending["source"].get("height"), diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 7aefa1797..588e8baa7 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -18,9 +18,9 @@ from aleph.logging import setup_logging from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.pending import PendingMessage, PendingTX -from aleph.network import check_message from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..schemas.pending_messages import parse_message LOGGER = logging.getLogger("jobs.pending_txs") @@ -37,24 +37,24 @@ async def handle_pending_tx( pending_tx["content"], tx_context, seen_ids=seen_ids ) if messages: - for i, message in enumerate(messages): - message["time"] = tx_context.time + (i / 1000) # force order + for i, message_dict in enumerate(messages): try: - message = await check_message( - message, trusted=True - ) # we don't check signatures yet. + # we don't check signatures yet. + message = parse_message(message_dict) except InvalidMessageError as error: LOGGER.warning(error) continue + message.time = tx_context.time + (i / 1000) # force order + # we add it to the message queue... bad idea? should we process it asap? db_operations.append( DbBulkOperation( collection=PendingMessage, operation=InsertOne( { - "message": message, + "message": message.dict(exclude={"content"}), "source": dict( chain_name=tx_context.chain_name, tx_hash=tx_context.tx_hash, diff --git a/src/aleph/network.py b/src/aleph/network.py index 7074d5b6f..05ab8c514 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -1,4 +1,3 @@ -import asyncio import json import logging from typing import Coroutine, Dict, List @@ -8,7 +7,7 @@ from aleph.exceptions import InvalidMessageError from aleph.register_chain import VERIFIER_REGISTER -from aleph.schemas.pending_messages import parse_message +from aleph.schemas.pending_messages import BasePendingMessage, parse_message from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel LOGGER = logging.getLogger("NETWORK") @@ -27,64 +26,40 @@ ] -async def incoming_check(ipfs_pubsub_message: Dict) -> Dict: - """Verifies an incoming message is sane, protecting from spam in the - meantime. +async def get_pubsub_message(ipfs_pubsub_message: Dict) -> BasePendingMessage: + """ + Extracts an Aleph message out of a pubsub message. - TODO: actually implement this, no check done here yet. IMPORTANT. + Note: this function validates the format of the message, but does not + perform extra validation (ex: signature checks). """ + message_data = ipfs_pubsub_message.get("data", b"").decode("utf-8") try: - message_data = ipfs_pubsub_message.get("data", b"").decode("utf-8") - message = json.loads(unquote(message_data)) - LOGGER.debug("New message! %r" % message) - - if message is None: - raise InvalidMessageError("Message may not be None") - - message = await check_message(message, from_network=True) - return message + message_dict = json.loads(unquote(message_data)) except json.JSONDecodeError: raise InvalidMessageError( "Data is not JSON: {}".format(ipfs_pubsub_message.get("data", "")) ) - -async def check_message( - message_dict: Dict, - from_chain: bool = False, - from_network: bool = False, - trusted: bool = False, -) -> Dict: - """This function should check the incoming message and verify any - extraneous or dangerous information for the rest of the process. - It also checks the data hash if it's not done by an external provider (ipfs) - and the data length. - Example of dangerous data: fake confirmations, fake tx_hash, bad times... - - If a item_content is there, set the item_type to inline, else to ipfs (default). - - TODO: Implement it fully! Dangerous! - """ + LOGGER.debug("New message! %r" % message_dict) message = parse_message(message_dict) + return message - if trusted: - # only in the case of a message programmatically built here - # from legacy native chain signing for example (signing offloaded) - return message_dict - else: - chain = message.chain - signer = VERIFIER_REGISTER.get(chain, None) - if signer is None: - raise InvalidMessageError("Unknown chain for validation %r" % chain) - try: - if await signer(message): - return message_dict - else: - raise InvalidMessageError("The signature of the message is invalid") - except ValueError: - raise InvalidMessageError("Signature validation error") + +async def verify_signature(message: BasePendingMessage) -> None: + chain = message.chain + signer = VERIFIER_REGISTER.get(chain, None) + if signer is None: + raise InvalidMessageError("Unknown chain for validation %r" % chain) + try: + if await signer(message): + return + else: + raise InvalidMessageError("The signature of the message is invalid") + except ValueError: + raise InvalidMessageError("Signature validation error") def listener_tasks(config, p2p_client: P2PClient) -> List[Coroutine]: diff --git a/src/aleph/permissions.py b/src/aleph/permissions.py index 3989b5db3..b0023ba6d 100644 --- a/src/aleph/permissions.py +++ b/src/aleph/permissions.py @@ -1,15 +1,16 @@ from typing import Dict from aleph.model.messages import get_computed_address_aggregates +from aleph.schemas.pending_messages import BasePendingMessage -async def check_sender_authorization(message: Dict, content: Dict) -> bool: +async def check_sender_authorization(message: BasePendingMessage, content: Dict) -> bool: """Checks a content against a message to verify if sender is authorized. TODO: implement "security" aggregate key check. """ - sender = message["sender"] + sender = message.sender address = content["address"] # if sender is the content address, all good. @@ -28,7 +29,7 @@ async def check_sender_authorization(message: Dict, content: Dict) -> bool: if auth.get("address", "") != sender: continue # not applicable, move on. - if auth.get("chain") and message["chain"] != auth.get("chain"): + if auth.get("chain") and message.chain != auth.get("chain"): continue channels = auth.get("channels", []) @@ -36,17 +37,17 @@ async def check_sender_authorization(message: Dict, content: Dict) -> bool: ptypes = auth.get("post_types", []) akeys = auth.get("aggregate_keys", []) - if len(channels) and message["channel"] not in channels: + if len(channels) and message.channel not in channels: continue - if len(mtypes) and message["type"] not in mtypes: + if len(mtypes) and message.type not in mtypes: continue - if message["type"] == "POST": + if message.type == "POST": if len(ptypes) and content["type"] not in ptypes: continue - if message["type"] == "AGGREGATE": + if message.type == "AGGREGATE": if len(akeys) and content["key"] not in akeys: continue diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 97bcbf3af..46c8edb1f 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -41,14 +41,14 @@ async def pub(topic: str, message: Union[str, bytes]): async def incoming_channel(topic) -> None: - from aleph.network import incoming_check + from aleph.network import get_pubsub_message from aleph.chains.common import process_one_message while True: try: async for mvalue in sub(topic): try: - message = await incoming_check(mvalue) + message = await get_pubsub_message(mvalue) LOGGER.debug("New message %r" % message) asyncio.create_task(process_one_message(message)) except InvalidMessageError: diff --git a/src/aleph/services/p2p/protocol.py b/src/aleph/services/p2p/protocol.py index 530df1fb8..de632fbbe 100644 --- a/src/aleph/services/p2p/protocol.py +++ b/src/aleph/services/p2p/protocol.py @@ -14,7 +14,7 @@ from aleph import __version__ from aleph.exceptions import AlephStorageException, InvalidMessageError -from aleph.network import incoming_check +from aleph.network import get_pubsub_message from aleph.services.utils import pubsub_msg_to_dict from .pubsub import receive_pubsub_messages, subscribe @@ -191,7 +191,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: # we should check the sender here to avoid spam # and such things... try: - message = await incoming_check(msg_dict) + message = await get_pubsub_message(msg_dict) except InvalidMessageError: continue diff --git a/src/aleph/storage.py b/src/aleph/storage.py index fa096c8fc..fe8ffc642 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -7,12 +7,13 @@ from dataclasses import dataclass from enum import Enum from hashlib import sha256 -from typing import Any, AnyStr, Dict, IO, Optional +from typing import Any, AnyStr, IO, Optional, Union, cast from aleph_message.models import ItemType from aleph.config import get_config from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable +from aleph.schemas.pending_messages import BasePendingMessage from aleph.services.filestore import get_value, set_value from aleph.services.ipfs.common import get_cid_version from aleph.services.ipfs.storage import add_bytes as add_ipfs_bytes @@ -57,7 +58,7 @@ def __len__(self): @dataclass class MessageContent(StoredContent): value: Any - raw_value: bytes + raw_value: Union[bytes, str] async def json_async_loads(s: AnyStr): @@ -66,19 +67,18 @@ async def json_async_loads(s: AnyStr): return await run_in_executor(None, json.loads, s) -async def get_message_content(message: Dict) -> MessageContent: - item_type: str = message.get("item_type", ItemType.ipfs) - item_hash = message["item_hash"] +async def get_message_content(message: BasePendingMessage) -> MessageContent: + item_type = message.item_type + item_hash = message.item_hash if item_type in (ItemType.ipfs, ItemType.storage): return await get_json(item_hash, engine=ItemType(item_type)) elif item_type == ItemType.inline: - if "item_content" not in message: - error_msg = f"No item_content in message {message.get('item_hash')}" - LOGGER.warning(error_msg) - raise InvalidContent(error_msg) # never retry, bogus data + # This hypothesis is validated at schema level + item_content = cast(str, message.item_content) + try: - item_content = await json_async_loads(message["item_content"]) + content = await json_async_loads(item_content) except (json.JSONDecodeError, KeyError) as e: error_msg = f"Can't decode JSON: {e}" LOGGER.warning(error_msg) @@ -87,8 +87,8 @@ async def get_message_content(message: Dict) -> MessageContent: return MessageContent( hash=item_hash, source=ContentSource.INLINE, - value=item_content, - raw_value=message["item_content"], + value=content, + raw_value=item_content, ) else: # unknown, could retry later? shouldn't have arrived this far though. diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index 1b4e566ab..490539e41 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -9,7 +9,7 @@ ) from unittest.mock import MagicMock -from aleph.schemas.pending_messages import BasePendingMessage +from aleph.schemas.pending_messages import BasePendingMessage, parse_message @pytest.mark.asyncio @@ -55,7 +55,7 @@ async def async_magic(): mocker.patch("aleph.model.db") - msg = { + message_dict = { "chain": "NULS", "channel": "SYSINFO", "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", @@ -65,6 +65,8 @@ async def async_magic(): "item_hash": "84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c", "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", } - msg["item_type"] = "inline" - status, ops = await incoming(msg, check_message=True) + message_dict["item_type"] = "inline" + + message = parse_message(message_dict) + status, ops = await incoming(message, check_message=True) assert status == IncomingStatus.MESSAGE_HANDLED diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index fec9fc6dc..5143150f8 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -5,19 +5,18 @@ from aleph.chains.common import process_one_message from aleph.model.messages import CappedMessage, Message +from aleph.schemas.pending_messages import parse_message - -MESSAGE = { +MESSAGE_DICT = { "chain": "ETH", - "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", - "type": "POST", "channel": "TEST", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "time": 1652803407.1179411, "item_type": "inline", - "size": 70, - "time": 1646123806, - "item_content": '{"body": "Top 10 cutest Kodiak bears that will definitely murder you"}', - "item_hash": "fd14aaae5693710fae42fc58049f80ba7abdbf0cce00eb73e585bc89907eaad8", - "signature": "0xccb6a4c7e2a709accf941463a93064a9f34ea1d03b17fe9d117c80fb0878ee0a2f284af4afb37de187a1116c0cec5b3a8da89b40d5281919dbeebdffc50c86c71c", + "item_content": '{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652803407.1178224,"content":{"body":"Top 10 cutest Kodiak bears that will definitely murder you"},"type":"test"}', + "item_hash": "85abdd0ea565ac0f282d1a86b5b3da87ed3d55426a78e9c0ec979ae58e947b9c", + "signature": "0xfd5183273be769aaa44ea494911c9e4702fde87dd7dd5e2d5ec76c0a251654544bc98eacd33ca204a536f55f726130683cab1d1ad5ac8da1cbbf39d4d7a124401b", } @@ -39,14 +38,15 @@ async def test_confirm_message(test_db): the websockets. """ - item_hash = MESSAGE["item_hash"] - content = json.loads(MESSAGE["item_content"]) + item_hash = MESSAGE_DICT["item_hash"] + content = json.loads(MESSAGE_DICT["item_content"]) - await process_one_message(MESSAGE) + message = parse_message(MESSAGE_DICT) + await process_one_message(message) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None - assert message_in_db["content"]["body"] == content["body"] + assert message_in_db["content"] == content assert not message_in_db["confirmed"] capped_message_in_db = await CappedMessage.collection.find_one( @@ -58,7 +58,7 @@ async def test_confirm_message(test_db): # Now, confirm the message chain_name, tx_hash, height = "ETH", "123", 8000 await process_one_message( - MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height + message, chain_name=chain_name, tx_hash=tx_hash, height=height ) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) @@ -86,12 +86,13 @@ async def test_process_confirmed_message(test_db): in capped messages. """ - item_hash = MESSAGE["item_hash"] + item_hash = MESSAGE_DICT["item_hash"] - # Now, confirm the message + # Confirm the message chain_name, tx_hash, height = "ETH", "123", 8000 + message = parse_message(MESSAGE_DICT) await process_one_message( - MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height + message, chain_name=chain_name, tx_hash=tx_hash, height=height ) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) diff --git a/tests/permissions/test_check_sender_authorization.py b/tests/permissions/test_check_sender_authorization.py index 7535d0e88..cf32876cc 100644 --- a/tests/permissions/test_check_sender_authorization.py +++ b/tests/permissions/test_check_sender_authorization.py @@ -1,34 +1,30 @@ from aleph.permissions import check_sender_authorization import pytest from aleph.model.messages import Message +from aleph.schemas.pending_messages import parse_message, PendingStoreMessage @pytest.mark.asyncio async def test_owner_is_sender(): - message = { + message_dict = { "_id": {"$oid": "6278d1f451c0b9a4fb11c8a9"}, "chain": "ETH", "item_hash": "2a5aaf71c8767bda8eb235223a3387b310af117f42fac08f02461e90aee073b0", "sender": "0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23", "type": "STORE", "channel": "TEST", - "confirmed": False, - "content": { - "address": "0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23", - "item_type": "storage", - "item_hash": "e916165d63c9b1d455dc415859ec3e1da5a3c6c86cc743cbedf2203fd92a2b1b", - "time": 1652085236.777, - "size": 2780, - "content_type": "file", - }, "item_content": '{"address":"0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23","item_type":"storage","item_hash":"e916165d63c9b1d455dc415859ec3e1da5a3c6c86cc743cbedf2203fd92a2b1b","time":1652085236.777}', "item_type": "inline", "signature": "0x51383ef8823665bd8ea1150175be0c3745a36ea1f0d503ceb51e0d7ff1fd88a5290665564bf9c2315d97884e7448efdb8d4b4f8293b47a641c2ff43f21b6c5b61c", - "size": 179, "time": 1652085236.777, } - is_authorized = await check_sender_authorization(message, message["content"]) + message = parse_message(message_dict) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized @@ -36,7 +32,7 @@ async def test_owner_is_sender(): async def test_store_unauthorized(mocker): mocker.patch("aleph.permissions.get_computed_address_aggregates", return_value={}) - message = { + message_dict = { "chain": "ETH", "channel": "TEST", "item_content": '{"address":"VM on executor","time":1651050219.3481126,"content":{"date":"2022-04-27T09:03:38.361081","test":true,"answer":42,"something":"interesting"},"type":"test"}', @@ -48,19 +44,12 @@ async def test_store_unauthorized(mocker): "type": "POST", } - content = { - "address": "VM on executor", - "content": { - "answer": 42, - "date": "2022-04-27T09:03:38.361081", - "something": "interesting", - "test": True, - }, - "time": 1651050219.3481126, - "type": "test", - } + message = parse_message(message_dict) + + # For mypy + assert message.content is not None - is_authorized = await check_sender_authorization(message, content) + is_authorized = await check_sender_authorization(message, message.content.dict()) assert not is_authorized @@ -68,20 +57,12 @@ async def test_store_unauthorized(mocker): "chain": "ETH", "channel": "TEST", "item_content": '{"address":"0xA3c613b12e862EB6e0C9897E03F1deEb207b5B58","time":1651050219.3481126,"content":{"date":"2022-04-27T09:03:38.361081","test":true,"answer":42,"something":"interesting"},"type":"test"}', - "content": { - "address": "0xA3c613b12e862EB6e0C9897E03F1deEb207b5B58", - "content": { - "answer": 42, - "date": "2022-04-27T09:03:38.361081", - "something": "interesting", - "test": True, - }, - }, - "item_hash": "498a10255877a74609654b673af4f8f29eb8ef1aa5d6265d9a6bf9e342d352db", + "item_hash": "1d8c28dac67725dd9d0ed218127d5ef7870443c803cd35598bb6cbb03ec76383", "item_type": "inline", "sender": "0x86F39e17910E3E6d9F38412EB7F24Bf0Ba31eb2E", "time": 1651050219.3488848, "type": "POST", + "signature": "fake signature, not checked here<", } @@ -100,9 +81,12 @@ async def test_authorized(mocker): }, ) - is_authorized = await check_sender_authorization( - AUTHORIZED_MESSAGE, AUTHORIZED_MESSAGE["content"] - ) + message = parse_message(AUTHORIZED_MESSAGE) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized @@ -141,7 +125,10 @@ async def test_authorized_with_db(test_db): await Message.collection.insert_one(security_message) - is_authorized = await check_sender_authorization( - AUTHORIZED_MESSAGE, AUTHORIZED_MESSAGE["content"] - ) + message = parse_message(AUTHORIZED_MESSAGE) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized diff --git a/tests/storage/forget/test_forget_multi_users.py b/tests/storage/forget/test_forget_multi_users.py index 50c219e66..f3666b273 100644 --- a/tests/storage/forget/test_forget_multi_users.py +++ b/tests/storage/forget/test_forget_multi_users.py @@ -12,6 +12,7 @@ set_value as store_gridfs_file, ) from aleph.model.messages import Message +from aleph.schemas.pending_messages import parse_message FIXTURES_DIR = Path(__file__).parent / "fixtures" @@ -25,41 +26,38 @@ async def test_forget_multiusers_storage(mocker, test_db): file_hash = "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3" - message_user1 = { + message_user1_dict = { "chain": "ETH", "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "type": "STORE", "channel": "TESTS_FORGET", "confirmed": False, "item_type": "inline", - "size": 202, "time": 1646123806, "item_content": '{"address": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "time": 1651757380.8522494, "item_type": "storage", "item_hash": "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3", "size": 220916, "content_type": "file"}', "item_hash": "50635384e43c7af6b3297f6571644c30f3f07ac681bfd14b9c556c63e661a69e", "signature": "0x71263de6b8d1ea4c0b028f5892287505f6ee73dfa165d1455ca665ffdf5318955345c193a5df2f5c4eb2185947689d7bf5be36155b00711572fec5f27764625c1b", } - message_user2 = { + message_user2_dict = { "chain": "ETH", "sender": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", "type": "STORE", "channel": "TESTS_FORGET", "confirmed": False, "item_type": "inline", - "size": 202, "time": 1646123806, "item_content": '{"address": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", "time": 1651757416.2203836, "item_type": "storage", "item_hash": "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3", "size": 220916, "content_type": "file"}', "item_hash": "dbe8199004b052108ec19618f43af1d2baf5c04974d0aec1c4de2d02c44a2483", "signature": "0x4c9ef501e1e4f4b0a05c1eebfa1063837a82788f80deeb59808d25ff481c855157dd65102eaa365e33c7572a78d551cf25075f49d00ebb60c8506c0a6647ab761b", } - forget_message_user1 = { + forget_message_user1_dict = { "chain": "ETH", "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "type": "FORGET", "channel": "TESTS_FORGET", "item_type": "inline", - "size": 202, "time": 1651757583.497435, "item_content": '{"address": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "time": 1651757583.4974332, "hashes": ["50635384e43c7af6b3297f6571644c30f3f07ac681bfd14b9c556c63e661a69e"], "reason": "I do not like this file"}', "item_hash": "0223e74dbae53b45da6a443fa18fd2a25f88677c82ed2de93f17ab24f78f58cf", @@ -71,35 +69,38 @@ async def test_forget_multiusers_storage(mocker, test_db): file_content = f.read() await store_gridfs_file(key=file_hash, value=file_content) + message_user1 = parse_message(message_user1_dict) await process_one_message(message_user1) message1_db = await Message.collection.find_one( - {"item_hash": message_user1["item_hash"]} + {"item_hash": message_user1.item_hash} ) assert message1_db is not None + message_user2 = parse_message(message_user2_dict) await process_one_message(message_user2) # Sanity check: check that the file exists db_file_data = await read_gridfs_file(file_hash) assert db_file_data == file_content + forget_message_user1 = parse_message(forget_message_user1_dict) await process_one_message(forget_message_user1) # Check that the message was properly forgotten forgotten_message = await Message.collection.find_one( - {"item_hash": message_user1["item_hash"]} + {"item_hash": message_user1.item_hash} ) assert forgotten_message is not None - assert forgotten_message["forgotten_by"] == [forget_message_user1["item_hash"]] + assert forgotten_message["forgotten_by"] == [forget_message_user1.item_hash] # Check that the message from user 2 is not affected message_user2_db = await Message.collection.find_one( - {"item_hash": message_user2["item_hash"]} + {"item_hash": message_user2.item_hash} ) assert message_user2_db is not None assert "forgotten_by" not in message_user2_db - assert message_user2_db["item_content"] == message_user2["item_content"] + assert message_user2_db["item_content"] == message_user2.item_content # Check that the file still exists db_file_data = await read_gridfs_file(file_hash) diff --git a/tests/storage/test_get_content.py b/tests/storage/test_get_content.py index 18678cec4..8d94ef72e 100644 --- a/tests/storage/test_get_content.py +++ b/tests/storage/test_get_content.py @@ -3,6 +3,7 @@ import pytest from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable +from aleph.schemas.pending_messages import parse_message from aleph.storage import ContentSource, get_hash_content, get_json, get_message_content from aleph_message.models import ItemType @@ -130,33 +131,13 @@ async def test_get_invalid_json(mocker, mock_config): _content = await get_json("1234") -@pytest.mark.asyncio -async def test_get_inline_content(mock_config): - content_hash = "message-hash" - json_content = [ - {"post": "The joys of JavaScript (JK)"}, - {"post": "Does your cat plan to murder you?"}, - ] - json_bytes = json.dumps(json_content).encode("utf-8") - message = { - "item_type": ItemType.inline.value, - "item_hash": content_hash, - "item_content": json_bytes, - } - - content = await get_message_content(message) - assert content.value == json_content - assert content.hash == content_hash - assert content.raw_value == json_bytes - - @pytest.mark.asyncio async def test_get_inline_content_full_message(): """ - Same test as above, with a complete message. Reuse of an older test. + Get inline content from a message. Reuses an older test/fixture. """ - msg = { + message_dict = { "chain": "NULS", "channel": "SYSINFO", "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", @@ -167,47 +148,37 @@ async def test_get_inline_content_full_message(): "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", "item_type": "inline", } - content = await get_message_content(msg) + + message = parse_message(message_dict) + content = await get_message_content(message) item_content = content.value - print(item_content) - assert len(content.raw_value) == len(msg['item_content']) - assert item_content['key'] == 'metrics' - assert item_content['address'] == 'TTapAav8g3fFjxQQCjwPd4ERPnai9oya' - assert 'memory' in item_content['content'] - assert 'cpu_cores' in item_content['content'] + + assert len(content.raw_value) == len(message.item_content) + assert item_content["key"] == "metrics" + assert item_content["address"] == "TTapAav8g3fFjxQQCjwPd4ERPnai9oya" + assert "memory" in item_content["content"] + assert "cpu_cores" in item_content["content"] @pytest.mark.asyncio async def test_get_stored_message_content(mocker, mock_config): - content_hash = "message-hash" + message_dict = { + "chain": "ETH", + "channel": "TEST", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "item_type": "storage", + "item_hash": "315f7313eb97d2c8299e3ee9c19d81f226c44ccf81c387c9fb25c54fced245f5", + "item_content": None, + "signature": "unsigned fixture, deal with it", + "time": 1652805847.190618, + } json_content = {"I": "Inter", "P": "Planetary", "F": "File", "S": "System"} json_bytes = json.dumps(json_content).encode("utf-8") mocker.patch("aleph.storage.get_value", return_value=json_bytes) - message = { - "item_type": ItemType.ipfs.value, - "item_hash": content_hash, - } + message = parse_message(message_dict) content = await get_message_content(message) assert content.value == json_content - assert content.hash == content_hash - - -@pytest.mark.asyncio -async def test_get_message_content_unknown_item_type(mocker, mock_config): - """ - Checks that an unknown item type field in a message will mark it as currently unavailable. - """ - - json_content = {"No more": "inspiration"} - json_bytes = json.dumps(json_content).encode("utf-8") - mocker.patch("aleph.storage.get_value", return_value=json_bytes) - - message = { - "item_type": "invalid-item-type", - "item_hash": "message_hash", - } - - with pytest.raises(ContentCurrentlyUnavailable): - _content = await get_message_content(message) + assert content.hash == message.item_hash diff --git a/tests/storage/test_store_message.py b/tests/storage/test_store_message.py index b87f3580c..add7c68f3 100644 --- a/tests/storage/test_store_message.py +++ b/tests/storage/test_store_message.py @@ -1,14 +1,16 @@ +import json + import pytest +from aleph.exceptions import UnknownHashError from aleph.handlers.storage import handle_new_storage +from aleph.schemas.pending_messages import parse_message, PendingStoreMessage from aleph.storage import ContentSource, RawContent -import json -from aleph.exceptions import UnknownHashError @pytest.fixture def fixture_message_file(): - return { + message_dict = { "_id": {"$oid": "621908cb378bcd3ef596fa50"}, "chain": "ETH", "item_hash": "7e4f914865028356704919810073ec5690ecc4bb0ee3bd6bdb24829fd532398f", @@ -29,11 +31,12 @@ def fixture_message_file(): } ], } + return parse_message(message_dict) @pytest.fixture def fixture_message_directory(): - return { + message_dict = { "_id": {"$oid": "1234"}, "chain": "ETH", "item_hash": "b3d17833bcefb7a6eb2d9fa7c77cca3eed3a3fa901a904d35c529a71be25fc6d", @@ -47,11 +50,12 @@ def fixture_message_directory(): "size": 158, "time": 1644409598.782, } + return parse_message(message_dict) @pytest.mark.asyncio async def test_handle_new_storage_invalid_content( - mock_config, fixture_message_directory + mock_config, fixture_message_directory: PendingStoreMessage ): missing_item_hash_content = { "address": "0x2278d6A697B2Be8aE4Ddf090f918d1642Ee43c8C", @@ -80,8 +84,10 @@ async def test_handle_new_storage_invalid_content( @pytest.mark.asyncio -async def test_handle_new_storage_file(mocker, mock_config, fixture_message_file): - content = json.loads(fixture_message_file["item_content"]) +async def test_handle_new_storage_file( + mocker, mock_config, fixture_message_file: PendingStoreMessage +): + content = json.loads(fixture_message_file.item_content) raw_content = RawContent( hash=content["item_hash"], @@ -115,7 +121,7 @@ async def test_handle_new_storage_file(mocker, mock_config, fixture_message_file @pytest.mark.asyncio async def test_handle_new_storage_directory( - mocker, mock_config, fixture_message_directory + mocker, mock_config, fixture_message_directory: PendingStoreMessage ): get_hash_content_mock = mocker.patch("aleph.handlers.storage.get_hash_content") mock_ipfs_api = mocker.MagicMock() @@ -129,7 +135,7 @@ async def test_handle_new_storage_directory( mock_ipfs_api.files.stat = mocker.AsyncMock(return_value=ipfs_stats) mocker.patch("aleph.handlers.storage.get_ipfs_api", return_value=mock_ipfs_api) - content = json.loads(fixture_message_directory["item_content"]) + content = json.loads(fixture_message_directory.item_content) result = await handle_new_storage(fixture_message_directory, content) assert result and result != -1 @@ -144,9 +150,9 @@ async def test_handle_new_storage_directory( @pytest.mark.asyncio async def test_handle_new_storage_invalid_hash( - mocker, mock_config, fixture_message_file + mock_config, fixture_message_file: PendingStoreMessage ): - content = json.loads(fixture_message_file["item_content"]) + content = json.loads(fixture_message_file.item_content) content["item_hash"] = "some-invalid-hash" with pytest.raises(UnknownHashError): diff --git a/tests/test_network.py b/tests/test_network.py index 4e28d1733..918fb3b8d 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -4,7 +4,8 @@ import aleph.chains from aleph.exceptions import InvalidMessageError from aleph.chains.common import IncomingStatus -from aleph.network import check_message +from aleph.network import verify_signature +from aleph.schemas.pending_messages import parse_message __author__ = "Moshe Malawach" __copyright__ = "Moshe Malawach" @@ -20,25 +21,28 @@ # assert msg['item_type'] == 'ipfs', "ipfs should be the default" # assert msg is passed_msg, "same object should be returned" + @pytest.mark.skip("TODO: NULS signature verification does not work with the fixture.") @pytest.mark.asyncio async def test_valid_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "NULS", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - message = await check_message(sample_message) - assert message is not None + + sample_message = parse_message(sample_message_dict) + await verify_signature(sample_message) @pytest.mark.asyncio async def test_invalid_chain_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", "item_type": "ipfs", "chain": "BAR", @@ -46,15 +50,16 @@ async def test_invalid_chain_message(): "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } + with pytest.raises(InvalidMessageError): - _ = await check_message(sample_message) + _ = parse_message(sample_message_dict) @pytest.mark.asyncio async def test_invalid_signature_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", "item_type": "ipfs", "chain": "NULS", @@ -62,86 +67,55 @@ async def test_invalid_signature_message(): "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "BAR" + "signature": "BAR", } + + sample_message = parse_message(sample_message_dict) with pytest.raises(InvalidMessageError): - _ = await check_message(sample_message) + _ = await verify_signature(sample_message) -@pytest.mark.skip("TODO: NULS signature verification does not fail as expected with this fixture.") @pytest.mark.asyncio async def test_invalid_signature_message_2(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "NULS", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2153041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554525c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2153041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554525c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - # with pytest.raises(InvalidMessageError): - x = await check_message(sample_message) - print(x) + sample_message = parse_message(sample_message_dict) + with pytest.raises(InvalidMessageError): + _ = await verify_signature(sample_message) -@pytest.mark.skip("TODO: NULS signature verification does not work with the fixture.") -@pytest.mark.asyncio -async def test_extraneous_fields(): - sample_message = { - "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", - "chain": "NULS", - "channel": "SYSINFO", - "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", - "type": "AGGREGATE", - "foo": "bar", - "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" - } - message = await check_message(sample_message) - # assert "type" not in message - assert "foo" not in message -# @pytest.mark.asyncio -# async def test_inline_content(): -# content = json.dumps({'foo': 'bar'}) -# h = hashlib.sha256() -# h.update(content.encode('utf-8')) -# sample_message = { -# "item_hash": h.hexdigest(), -# "item_content": content, -# "chain": "NULS" -# } -# message = await check_message(sample_message, trusted=True) -# assert message is not None -# assert message['item_hash'] == h.hexdigest() -# assert message['item_content'] == content -# assert message['item_type'] == 'inline' - - -@pytest.mark.skip("TODO: NULS signature verification does not work with the fixtures.") @pytest.mark.asyncio async def test_incoming_inline_content(mocker): from aleph.chains.common import incoming from unittest.mock import MagicMock - + async def async_magic(): pass MagicMock.__await__ = lambda x: async_magic().__await__() - - mocker.patch('aleph.model.db') - - msg = {'chain': 'NULS', - 'channel': 'SYSINFO', - 'sender': 'TTapAav8g3fFjxQQCjwPd4ERPnai9oya', - 'type': 'AGGREGATE', - 'time': 1564581054.0532622, - 'item_content': '{"key":"metrics","address":"TTapAav8g3fFjxQQCjwPd4ERPnai9oya","content":{"memory":{"total":12578275328,"available":5726081024,"percent":54.5,"used":6503415808,"free":238661632,"active":8694841344,"inactive":2322239488,"buffers":846553088,"cached":4989644800,"shared":172527616,"slab":948609024},"swap":{"total":7787769856,"free":7787495424,"used":274432,"percent":0.0,"swapped_in":0,"swapped_out":16384},"cpu":{"user":9.0,"nice":0.0,"system":3.1,"idle":85.4,"iowait":0.0,"irq":0.0,"softirq":2.5,"steal":0.0,"guest":0.0,"guest_nice":0.0},"cpu_cores":[{"user":8.9,"nice":0.0,"system":2.4,"idle":82.2,"iowait":0.0,"irq":0.0,"softirq":6.4,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.6,"nice":0.0,"system":2.9,"idle":84.6,"iowait":0.0,"irq":0.0,"softirq":2.9,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":7.2,"nice":0.0,"system":3.0,"idle":86.8,"iowait":0.0,"irq":0.0,"softirq":3.0,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":3.0,"idle":84.8,"iowait":0.1,"irq":0.0,"softirq":0.7,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.3,"nice":0.0,"system":3.3,"idle":87.0,"iowait":0.1,"irq":0.0,"softirq":0.3,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":5.5,"nice":0.0,"system":4.4,"idle":89.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":8.7,"nice":0.0,"system":3.3,"idle":87.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":2.3,"idle":80.3,"iowait":0.0,"irq":0.0,"softirq":6.1,"steal":0.0,"guest":0.0,"guest_nice":0.0}]},"time":1564581054.0358574}', - 'item_hash': '84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c', - 'signature': '21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7' - } - # msg['item_type'] = 'inline' - msg = await check_message(msg) - status, ops = await incoming(msg) + + mocker.patch("aleph.model.db") + + message_dict = { + "chain": "NULS", + "channel": "SYSINFO", + "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", + "type": "AGGREGATE", + "time": 1564581054.0532622, + "item_type": "inline", + "item_content": '{"key":"metrics","address":"TTapAav8g3fFjxQQCjwPd4ERPnai9oya","content":{"memory":{"total":12578275328,"available":5726081024,"percent":54.5,"used":6503415808,"free":238661632,"active":8694841344,"inactive":2322239488,"buffers":846553088,"cached":4989644800,"shared":172527616,"slab":948609024},"swap":{"total":7787769856,"free":7787495424,"used":274432,"percent":0.0,"swapped_in":0,"swapped_out":16384},"cpu":{"user":9.0,"nice":0.0,"system":3.1,"idle":85.4,"iowait":0.0,"irq":0.0,"softirq":2.5,"steal":0.0,"guest":0.0,"guest_nice":0.0},"cpu_cores":[{"user":8.9,"nice":0.0,"system":2.4,"idle":82.2,"iowait":0.0,"irq":0.0,"softirq":6.4,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.6,"nice":0.0,"system":2.9,"idle":84.6,"iowait":0.0,"irq":0.0,"softirq":2.9,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":7.2,"nice":0.0,"system":3.0,"idle":86.8,"iowait":0.0,"irq":0.0,"softirq":3.0,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":3.0,"idle":84.8,"iowait":0.1,"irq":0.0,"softirq":0.7,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.3,"nice":0.0,"system":3.3,"idle":87.0,"iowait":0.1,"irq":0.0,"softirq":0.3,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":5.5,"nice":0.0,"system":4.4,"idle":89.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":8.7,"nice":0.0,"system":3.3,"idle":87.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":2.3,"idle":80.3,"iowait":0.0,"irq":0.0,"softirq":6.1,"steal":0.0,"guest":0.0,"guest_nice":0.0}]},"time":1564581054.0358574}', + "item_hash": "84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c", + "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", + } + message = parse_message(message_dict) + status, ops = await incoming(message) assert status == IncomingStatus.MESSAGE_HANDLED