diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 1f5885c0e..d8b7c7c73 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -5,7 +5,6 @@ from enum import IntEnum from typing import Dict, Optional, Tuple, List -from aleph_message.models import MessageType from bson import ObjectId from pymongo import UpdateOne @@ -23,11 +22,11 @@ from aleph.model.filepin import PermanentPin from aleph.model.messages import CappedMessage, Message from aleph.model.pending import PendingMessage, PendingTX -from aleph.network import check_message as check_message_fn +from aleph.network import verify_signature from aleph.permissions import check_sender_authorization from aleph.storage import get_json, pin_hash, add_json, get_message_content from .tx_context import TxContext -from ..schemas.pending_messages import BasePendingMessage +from ..schemas.pending_messages import BasePendingMessage, PendingForgetMessage, PendingStoreMessage LOGGER = logging.getLogger("chains.common") @@ -53,12 +52,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height): } -async def delayed_incoming(message, chain_name=None, tx_hash=None, height=None): +async def delayed_incoming( + message: BasePendingMessage, + chain_name: Optional[str] = None, + tx_hash: Optional[str] = None, + height: Optional[int] = None, +): if message is None: return await PendingMessage.collection.insert_one( { - "message": message, + "message": message.dict(exclude={"content"}), "source": dict( chain_name=chain_name, tx_hash=tx_hash, @@ -76,7 +80,7 @@ class IncomingStatus(IntEnum): async def mark_message_for_retry( - message: Dict, + message: BasePendingMessage, chain_name: Optional[str], tx_hash: Optional[str], height: Optional[int], @@ -84,10 +88,12 @@ async def mark_message_for_retry( retrying: bool, existing_id, ): + message_dict = message.dict(exclude={"content"}) + if not retrying: await PendingMessage.collection.insert_one( { - "message": message, + "message": message_dict, "source": dict( chain_name=chain_name, tx_hash=tx_hash, @@ -105,7 +111,7 @@ async def mark_message_for_retry( async def incoming( - message: Dict, + message: BasePendingMessage, chain_name: Optional[str] = None, tx_hash: Optional[str] = None, height: Optional[int] = None, @@ -120,8 +126,8 @@ async def incoming( if existing in database, created if not. """ - item_hash = message["item_hash"] - sender = message["sender"] + item_hash = message.item_hash + sender = message.sender ids_key = (item_hash, sender, chain_name) if chain_name and tx_hash and height and seen_ids is not None: @@ -131,9 +137,9 @@ async def incoming( filters = { "item_hash": item_hash, - "chain": message["chain"], - "sender": message["sender"], - "type": message["type"], + "chain": message.chain, + "sender": message.sender, + "type": message.type, } existing = await Message.collection.find_one( filters, @@ -141,40 +147,28 @@ async def incoming( ) if check_message: - if existing is None or (existing["signature"] != message["signature"]): + if existing is None or (existing["signature"] != message.signature): # check/sanitize the message if needed try: - message = await check_message_fn( - message, from_chain=(chain_name is not None) - ) + await verify_signature(message) except InvalidMessageError: return IncomingStatus.FAILED_PERMANENTLY, [] - if message is None: - return IncomingStatus.MESSAGE_HANDLED, [] - if retrying: LOGGER.debug("(Re)trying %s." % item_hash) else: LOGGER.info("Incoming %s." % item_hash) - # we set the incoming chain as default for signature - message["chain"] = message.get("chain", chain_name) - - # if existing is None: - # # TODO: verify if search key is ok. do we need an unique key for messages? - # existing = await Message.collection.find_one( - # filters, projection={'confirmed': 1, 'confirmations': 1, 'time': 1}) + updates: Dict[str, Dict] if chain_name and tx_hash and height: # We are getting a confirmation here new_values = await mark_confirmed_data(chain_name, tx_hash, height) - updates = { "$set": { "confirmed": True, }, - "$min": {"time": message["time"]}, + "$min": {"time": message.time}, "$addToSet": {"confirmations": new_values["confirmations"][0]}, } else: @@ -182,10 +176,10 @@ async def incoming( "$max": { "confirmed": False, }, - "$min": {"time": message["time"]}, + "$min": {"time": message.time}, + "$set": {}, } - # new_values = {'confirmed': False} # this should be our default. should_commit = False if existing: if seen_ids is not None and height is not None: @@ -237,19 +231,19 @@ async def incoming( json_content = content.value if json_content.get("address", None) is None: - json_content["address"] = message["sender"] + json_content["address"] = message.sender if json_content.get("time", None) is None: - json_content["time"] = message["time"] + json_content["time"] = message.time # warning: those handlers can modify message and content in place # and return a status. None has to be retried, -1 is discarded, True is # handled and kept. # TODO: change this, it's messy. try: - if message["type"] == MessageType.store: + if isinstance(message, PendingStoreMessage): handling_result = await handle_new_storage(message, json_content) - elif message["type"] == MessageType.forget: + elif isinstance(message, PendingForgetMessage): # Handling it here means that there we ensure that the message # has been forgotten before it is saved on the node. # We may want the opposite instead: ensure that the message has @@ -298,14 +292,14 @@ async def incoming( seen_ids[ids_key] = height LOGGER.debug("New message to store for %s." % item_hash) - # message.update(new_values) + updates["$set"] = { "content": json_content, "size": len(content.raw_value), - "item_content": message.get("item_content"), - "item_type": message.get("item_type"), - "channel": message.get("channel"), - "signature": message.get("signature"), + "item_content": message.item_content, + "item_type": message.item_type.value, + "channel": message.channel, + "signature": message.signature, **updates.get("$set", {}), } should_commit = True @@ -324,7 +318,7 @@ async def incoming( return IncomingStatus.MESSAGE_HANDLED, [] -async def process_one_message(message: Dict, *args, **kwargs): +async def process_one_message(message: BasePendingMessage, *args, **kwargs): """ Helper function to process a message on the spot. """ diff --git a/src/aleph/handlers/forget.py b/src/aleph/handlers/forget.py index 5daa9faaa..31e5ef627 100644 --- a/src/aleph/handlers/forget.py +++ b/src/aleph/handlers/forget.py @@ -6,12 +6,12 @@ from aioipfs.api import RepoAPI from aioipfs.exceptions import NotPinnedError -from aleph_message.models import ForgetMessage, MessageType -from aleph_message.models import ItemType +from aleph_message.models import ItemType, MessageType, ForgetContent from aleph.model.filepin import PermanentPin from aleph.model.hashes import delete_value from aleph.model.messages import Message +from aleph.schemas.pending_messages import PendingForgetMessage from aleph.services.ipfs.common import get_ipfs_api from aleph.utils import item_type_from_hash @@ -118,7 +118,7 @@ async def garbage_collect(storage_hash: str, storage_type: ItemType): async def is_allowed_to_forget( - target_info: TargetMessageInfo, by: ForgetMessage + target_info: TargetMessageInfo, by: PendingForgetMessage ) -> bool: """Check if a forget message is allowed to 'forget' the target message given its hash.""" # Both senders are identical: @@ -136,7 +136,7 @@ async def is_allowed_to_forget( async def forget_if_allowed( - target_info: TargetMessageInfo, forget_message: ForgetMessage + target_info: TargetMessageInfo, forget_message: PendingForgetMessage ) -> None: """Forget a message. @@ -210,10 +210,9 @@ async def get_target_message_info(target_hash: str) -> Optional[TargetMessageInf return TargetMessageInfo.from_db_object(message_dict) -async def handle_forget_message(message: Dict, content: Dict): +async def handle_forget_message(forget_message: PendingForgetMessage, content: Dict): # Parsing and validation - forget_message = ForgetMessage(**message, content=content) - logger.debug(f"Handling forget message {forget_message.item_hash}") + forget_message.content = ForgetContent(**content) for target_hash in forget_message.content.hashes: target_info = await get_target_message_info(target_hash) diff --git a/src/aleph/handlers/storage.py b/src/aleph/handlers/storage.py index 89f938aa7..bf07af6be 100644 --- a/src/aleph/handlers/storage.py +++ b/src/aleph/handlers/storage.py @@ -19,6 +19,7 @@ from aleph.config import get_config from aleph.exceptions import AlephStorageException, UnknownHashError +from aleph.schemas.pending_messages import PendingStoreMessage from aleph.services.ipfs.common import get_ipfs_api from aleph.storage import get_hash_content from aleph.utils import item_type_from_hash @@ -26,7 +27,7 @@ LOGGER = logging.getLogger("HANDLERS.STORAGE") -async def handle_new_storage(message: Dict, content: Dict): +async def handle_new_storage(message: PendingStoreMessage, content: Dict): config = get_config() if not config.storage.store_files.value: return True # Ignore @@ -34,7 +35,7 @@ async def handle_new_storage(message: Dict, content: Dict): # TODO: ideally the content should be transformed earlier, but this requires more clean up # (ex: no more in place modification of content, simplification of the flow) try: - store_message = StoreMessage(**message, content=content) + store_message = StoreMessage(**message.dict(exclude={"content"}), content=content) except ValidationError as e: print(e) return -1 # Invalid store message, discard diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 3fbfd779d..d2640865d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -48,9 +49,11 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: + message = parse_message(pending["message"]) + async with sem: status, operations = await incoming( - pending["message"], + message=message, chain_name=pending["source"].get("chain_name"), tx_hash=pending["source"].get("tx_hash"), height=pending["source"].get("height"), diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 7aefa1797..588e8baa7 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -18,9 +18,9 @@ from aleph.logging import setup_logging from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.pending import PendingMessage, PendingTX -from aleph.network import check_message from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..schemas.pending_messages import parse_message LOGGER = logging.getLogger("jobs.pending_txs") @@ -37,24 +37,24 @@ async def handle_pending_tx( pending_tx["content"], tx_context, seen_ids=seen_ids ) if messages: - for i, message in enumerate(messages): - message["time"] = tx_context.time + (i / 1000) # force order + for i, message_dict in enumerate(messages): try: - message = await check_message( - message, trusted=True - ) # we don't check signatures yet. + # we don't check signatures yet. + message = parse_message(message_dict) except InvalidMessageError as error: LOGGER.warning(error) continue + message.time = tx_context.time + (i / 1000) # force order + # we add it to the message queue... bad idea? should we process it asap? db_operations.append( DbBulkOperation( collection=PendingMessage, operation=InsertOne( { - "message": message, + "message": message.dict(exclude={"content"}), "source": dict( chain_name=tx_context.chain_name, tx_hash=tx_context.tx_hash, diff --git a/src/aleph/network.py b/src/aleph/network.py index 7074d5b6f..05ab8c514 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -1,4 +1,3 @@ -import asyncio import json import logging from typing import Coroutine, Dict, List @@ -8,7 +7,7 @@ from aleph.exceptions import InvalidMessageError from aleph.register_chain import VERIFIER_REGISTER -from aleph.schemas.pending_messages import parse_message +from aleph.schemas.pending_messages import BasePendingMessage, parse_message from aleph.services.ipfs.pubsub import incoming_channel as incoming_ipfs_channel LOGGER = logging.getLogger("NETWORK") @@ -27,64 +26,40 @@ ] -async def incoming_check(ipfs_pubsub_message: Dict) -> Dict: - """Verifies an incoming message is sane, protecting from spam in the - meantime. +async def get_pubsub_message(ipfs_pubsub_message: Dict) -> BasePendingMessage: + """ + Extracts an Aleph message out of a pubsub message. - TODO: actually implement this, no check done here yet. IMPORTANT. + Note: this function validates the format of the message, but does not + perform extra validation (ex: signature checks). """ + message_data = ipfs_pubsub_message.get("data", b"").decode("utf-8") try: - message_data = ipfs_pubsub_message.get("data", b"").decode("utf-8") - message = json.loads(unquote(message_data)) - LOGGER.debug("New message! %r" % message) - - if message is None: - raise InvalidMessageError("Message may not be None") - - message = await check_message(message, from_network=True) - return message + message_dict = json.loads(unquote(message_data)) except json.JSONDecodeError: raise InvalidMessageError( "Data is not JSON: {}".format(ipfs_pubsub_message.get("data", "")) ) - -async def check_message( - message_dict: Dict, - from_chain: bool = False, - from_network: bool = False, - trusted: bool = False, -) -> Dict: - """This function should check the incoming message and verify any - extraneous or dangerous information for the rest of the process. - It also checks the data hash if it's not done by an external provider (ipfs) - and the data length. - Example of dangerous data: fake confirmations, fake tx_hash, bad times... - - If a item_content is there, set the item_type to inline, else to ipfs (default). - - TODO: Implement it fully! Dangerous! - """ + LOGGER.debug("New message! %r" % message_dict) message = parse_message(message_dict) + return message - if trusted: - # only in the case of a message programmatically built here - # from legacy native chain signing for example (signing offloaded) - return message_dict - else: - chain = message.chain - signer = VERIFIER_REGISTER.get(chain, None) - if signer is None: - raise InvalidMessageError("Unknown chain for validation %r" % chain) - try: - if await signer(message): - return message_dict - else: - raise InvalidMessageError("The signature of the message is invalid") - except ValueError: - raise InvalidMessageError("Signature validation error") + +async def verify_signature(message: BasePendingMessage) -> None: + chain = message.chain + signer = VERIFIER_REGISTER.get(chain, None) + if signer is None: + raise InvalidMessageError("Unknown chain for validation %r" % chain) + try: + if await signer(message): + return + else: + raise InvalidMessageError("The signature of the message is invalid") + except ValueError: + raise InvalidMessageError("Signature validation error") def listener_tasks(config, p2p_client: P2PClient) -> List[Coroutine]: diff --git a/src/aleph/permissions.py b/src/aleph/permissions.py index 3989b5db3..b0023ba6d 100644 --- a/src/aleph/permissions.py +++ b/src/aleph/permissions.py @@ -1,15 +1,16 @@ from typing import Dict from aleph.model.messages import get_computed_address_aggregates +from aleph.schemas.pending_messages import BasePendingMessage -async def check_sender_authorization(message: Dict, content: Dict) -> bool: +async def check_sender_authorization(message: BasePendingMessage, content: Dict) -> bool: """Checks a content against a message to verify if sender is authorized. TODO: implement "security" aggregate key check. """ - sender = message["sender"] + sender = message.sender address = content["address"] # if sender is the content address, all good. @@ -28,7 +29,7 @@ async def check_sender_authorization(message: Dict, content: Dict) -> bool: if auth.get("address", "") != sender: continue # not applicable, move on. - if auth.get("chain") and message["chain"] != auth.get("chain"): + if auth.get("chain") and message.chain != auth.get("chain"): continue channels = auth.get("channels", []) @@ -36,17 +37,17 @@ async def check_sender_authorization(message: Dict, content: Dict) -> bool: ptypes = auth.get("post_types", []) akeys = auth.get("aggregate_keys", []) - if len(channels) and message["channel"] not in channels: + if len(channels) and message.channel not in channels: continue - if len(mtypes) and message["type"] not in mtypes: + if len(mtypes) and message.type not in mtypes: continue - if message["type"] == "POST": + if message.type == "POST": if len(ptypes) and content["type"] not in ptypes: continue - if message["type"] == "AGGREGATE": + if message.type == "AGGREGATE": if len(akeys) and content["key"] not in akeys: continue diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 97bcbf3af..46c8edb1f 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -41,14 +41,14 @@ async def pub(topic: str, message: Union[str, bytes]): async def incoming_channel(topic) -> None: - from aleph.network import incoming_check + from aleph.network import get_pubsub_message from aleph.chains.common import process_one_message while True: try: async for mvalue in sub(topic): try: - message = await incoming_check(mvalue) + message = await get_pubsub_message(mvalue) LOGGER.debug("New message %r" % message) asyncio.create_task(process_one_message(message)) except InvalidMessageError: diff --git a/src/aleph/services/p2p/protocol.py b/src/aleph/services/p2p/protocol.py index 530df1fb8..de632fbbe 100644 --- a/src/aleph/services/p2p/protocol.py +++ b/src/aleph/services/p2p/protocol.py @@ -14,7 +14,7 @@ from aleph import __version__ from aleph.exceptions import AlephStorageException, InvalidMessageError -from aleph.network import incoming_check +from aleph.network import get_pubsub_message from aleph.services.utils import pubsub_msg_to_dict from .pubsub import receive_pubsub_messages, subscribe @@ -191,7 +191,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: # we should check the sender here to avoid spam # and such things... try: - message = await incoming_check(msg_dict) + message = await get_pubsub_message(msg_dict) except InvalidMessageError: continue diff --git a/src/aleph/storage.py b/src/aleph/storage.py index fa096c8fc..fe8ffc642 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -7,12 +7,13 @@ from dataclasses import dataclass from enum import Enum from hashlib import sha256 -from typing import Any, AnyStr, Dict, IO, Optional +from typing import Any, AnyStr, IO, Optional, Union, cast from aleph_message.models import ItemType from aleph.config import get_config from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable +from aleph.schemas.pending_messages import BasePendingMessage from aleph.services.filestore import get_value, set_value from aleph.services.ipfs.common import get_cid_version from aleph.services.ipfs.storage import add_bytes as add_ipfs_bytes @@ -57,7 +58,7 @@ def __len__(self): @dataclass class MessageContent(StoredContent): value: Any - raw_value: bytes + raw_value: Union[bytes, str] async def json_async_loads(s: AnyStr): @@ -66,19 +67,18 @@ async def json_async_loads(s: AnyStr): return await run_in_executor(None, json.loads, s) -async def get_message_content(message: Dict) -> MessageContent: - item_type: str = message.get("item_type", ItemType.ipfs) - item_hash = message["item_hash"] +async def get_message_content(message: BasePendingMessage) -> MessageContent: + item_type = message.item_type + item_hash = message.item_hash if item_type in (ItemType.ipfs, ItemType.storage): return await get_json(item_hash, engine=ItemType(item_type)) elif item_type == ItemType.inline: - if "item_content" not in message: - error_msg = f"No item_content in message {message.get('item_hash')}" - LOGGER.warning(error_msg) - raise InvalidContent(error_msg) # never retry, bogus data + # This hypothesis is validated at schema level + item_content = cast(str, message.item_content) + try: - item_content = await json_async_loads(message["item_content"]) + content = await json_async_loads(item_content) except (json.JSONDecodeError, KeyError) as e: error_msg = f"Can't decode JSON: {e}" LOGGER.warning(error_msg) @@ -87,8 +87,8 @@ async def get_message_content(message: Dict) -> MessageContent: return MessageContent( hash=item_hash, source=ContentSource.INLINE, - value=item_content, - raw_value=message["item_content"], + value=content, + raw_value=item_content, ) else: # unknown, could retry later? shouldn't have arrived this far though. diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index 1b4e566ab..490539e41 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -9,7 +9,7 @@ ) from unittest.mock import MagicMock -from aleph.schemas.pending_messages import BasePendingMessage +from aleph.schemas.pending_messages import BasePendingMessage, parse_message @pytest.mark.asyncio @@ -55,7 +55,7 @@ async def async_magic(): mocker.patch("aleph.model.db") - msg = { + message_dict = { "chain": "NULS", "channel": "SYSINFO", "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", @@ -65,6 +65,8 @@ async def async_magic(): "item_hash": "84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c", "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", } - msg["item_type"] = "inline" - status, ops = await incoming(msg, check_message=True) + message_dict["item_type"] = "inline" + + message = parse_message(message_dict) + status, ops = await incoming(message, check_message=True) assert status == IncomingStatus.MESSAGE_HANDLED diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index fec9fc6dc..5143150f8 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -5,19 +5,18 @@ from aleph.chains.common import process_one_message from aleph.model.messages import CappedMessage, Message +from aleph.schemas.pending_messages import parse_message - -MESSAGE = { +MESSAGE_DICT = { "chain": "ETH", - "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", - "type": "POST", "channel": "TEST", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "time": 1652803407.1179411, "item_type": "inline", - "size": 70, - "time": 1646123806, - "item_content": '{"body": "Top 10 cutest Kodiak bears that will definitely murder you"}', - "item_hash": "fd14aaae5693710fae42fc58049f80ba7abdbf0cce00eb73e585bc89907eaad8", - "signature": "0xccb6a4c7e2a709accf941463a93064a9f34ea1d03b17fe9d117c80fb0878ee0a2f284af4afb37de187a1116c0cec5b3a8da89b40d5281919dbeebdffc50c86c71c", + "item_content": '{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652803407.1178224,"content":{"body":"Top 10 cutest Kodiak bears that will definitely murder you"},"type":"test"}', + "item_hash": "85abdd0ea565ac0f282d1a86b5b3da87ed3d55426a78e9c0ec979ae58e947b9c", + "signature": "0xfd5183273be769aaa44ea494911c9e4702fde87dd7dd5e2d5ec76c0a251654544bc98eacd33ca204a536f55f726130683cab1d1ad5ac8da1cbbf39d4d7a124401b", } @@ -39,14 +38,15 @@ async def test_confirm_message(test_db): the websockets. """ - item_hash = MESSAGE["item_hash"] - content = json.loads(MESSAGE["item_content"]) + item_hash = MESSAGE_DICT["item_hash"] + content = json.loads(MESSAGE_DICT["item_content"]) - await process_one_message(MESSAGE) + message = parse_message(MESSAGE_DICT) + await process_one_message(message) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None - assert message_in_db["content"]["body"] == content["body"] + assert message_in_db["content"] == content assert not message_in_db["confirmed"] capped_message_in_db = await CappedMessage.collection.find_one( @@ -58,7 +58,7 @@ async def test_confirm_message(test_db): # Now, confirm the message chain_name, tx_hash, height = "ETH", "123", 8000 await process_one_message( - MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height + message, chain_name=chain_name, tx_hash=tx_hash, height=height ) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) @@ -86,12 +86,13 @@ async def test_process_confirmed_message(test_db): in capped messages. """ - item_hash = MESSAGE["item_hash"] + item_hash = MESSAGE_DICT["item_hash"] - # Now, confirm the message + # Confirm the message chain_name, tx_hash, height = "ETH", "123", 8000 + message = parse_message(MESSAGE_DICT) await process_one_message( - MESSAGE, chain_name=chain_name, tx_hash=tx_hash, height=height + message, chain_name=chain_name, tx_hash=tx_hash, height=height ) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) diff --git a/tests/permissions/test_check_sender_authorization.py b/tests/permissions/test_check_sender_authorization.py index 7535d0e88..cf32876cc 100644 --- a/tests/permissions/test_check_sender_authorization.py +++ b/tests/permissions/test_check_sender_authorization.py @@ -1,34 +1,30 @@ from aleph.permissions import check_sender_authorization import pytest from aleph.model.messages import Message +from aleph.schemas.pending_messages import parse_message, PendingStoreMessage @pytest.mark.asyncio async def test_owner_is_sender(): - message = { + message_dict = { "_id": {"$oid": "6278d1f451c0b9a4fb11c8a9"}, "chain": "ETH", "item_hash": "2a5aaf71c8767bda8eb235223a3387b310af117f42fac08f02461e90aee073b0", "sender": "0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23", "type": "STORE", "channel": "TEST", - "confirmed": False, - "content": { - "address": "0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23", - "item_type": "storage", - "item_hash": "e916165d63c9b1d455dc415859ec3e1da5a3c6c86cc743cbedf2203fd92a2b1b", - "time": 1652085236.777, - "size": 2780, - "content_type": "file", - }, "item_content": '{"address":"0xdeF61fAadE93a8aaE303D083Ead5BF7a25E55a23","item_type":"storage","item_hash":"e916165d63c9b1d455dc415859ec3e1da5a3c6c86cc743cbedf2203fd92a2b1b","time":1652085236.777}', "item_type": "inline", "signature": "0x51383ef8823665bd8ea1150175be0c3745a36ea1f0d503ceb51e0d7ff1fd88a5290665564bf9c2315d97884e7448efdb8d4b4f8293b47a641c2ff43f21b6c5b61c", - "size": 179, "time": 1652085236.777, } - is_authorized = await check_sender_authorization(message, message["content"]) + message = parse_message(message_dict) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized @@ -36,7 +32,7 @@ async def test_owner_is_sender(): async def test_store_unauthorized(mocker): mocker.patch("aleph.permissions.get_computed_address_aggregates", return_value={}) - message = { + message_dict = { "chain": "ETH", "channel": "TEST", "item_content": '{"address":"VM on executor","time":1651050219.3481126,"content":{"date":"2022-04-27T09:03:38.361081","test":true,"answer":42,"something":"interesting"},"type":"test"}', @@ -48,19 +44,12 @@ async def test_store_unauthorized(mocker): "type": "POST", } - content = { - "address": "VM on executor", - "content": { - "answer": 42, - "date": "2022-04-27T09:03:38.361081", - "something": "interesting", - "test": True, - }, - "time": 1651050219.3481126, - "type": "test", - } + message = parse_message(message_dict) + + # For mypy + assert message.content is not None - is_authorized = await check_sender_authorization(message, content) + is_authorized = await check_sender_authorization(message, message.content.dict()) assert not is_authorized @@ -68,20 +57,12 @@ async def test_store_unauthorized(mocker): "chain": "ETH", "channel": "TEST", "item_content": '{"address":"0xA3c613b12e862EB6e0C9897E03F1deEb207b5B58","time":1651050219.3481126,"content":{"date":"2022-04-27T09:03:38.361081","test":true,"answer":42,"something":"interesting"},"type":"test"}', - "content": { - "address": "0xA3c613b12e862EB6e0C9897E03F1deEb207b5B58", - "content": { - "answer": 42, - "date": "2022-04-27T09:03:38.361081", - "something": "interesting", - "test": True, - }, - }, - "item_hash": "498a10255877a74609654b673af4f8f29eb8ef1aa5d6265d9a6bf9e342d352db", + "item_hash": "1d8c28dac67725dd9d0ed218127d5ef7870443c803cd35598bb6cbb03ec76383", "item_type": "inline", "sender": "0x86F39e17910E3E6d9F38412EB7F24Bf0Ba31eb2E", "time": 1651050219.3488848, "type": "POST", + "signature": "fake signature, not checked here<", } @@ -100,9 +81,12 @@ async def test_authorized(mocker): }, ) - is_authorized = await check_sender_authorization( - AUTHORIZED_MESSAGE, AUTHORIZED_MESSAGE["content"] - ) + message = parse_message(AUTHORIZED_MESSAGE) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized @@ -141,7 +125,10 @@ async def test_authorized_with_db(test_db): await Message.collection.insert_one(security_message) - is_authorized = await check_sender_authorization( - AUTHORIZED_MESSAGE, AUTHORIZED_MESSAGE["content"] - ) + message = parse_message(AUTHORIZED_MESSAGE) + + # For mypy + assert message.content is not None + + is_authorized = await check_sender_authorization(message, message.content.dict()) assert is_authorized diff --git a/tests/storage/forget/test_forget_multi_users.py b/tests/storage/forget/test_forget_multi_users.py index 50c219e66..f3666b273 100644 --- a/tests/storage/forget/test_forget_multi_users.py +++ b/tests/storage/forget/test_forget_multi_users.py @@ -12,6 +12,7 @@ set_value as store_gridfs_file, ) from aleph.model.messages import Message +from aleph.schemas.pending_messages import parse_message FIXTURES_DIR = Path(__file__).parent / "fixtures" @@ -25,41 +26,38 @@ async def test_forget_multiusers_storage(mocker, test_db): file_hash = "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3" - message_user1 = { + message_user1_dict = { "chain": "ETH", "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "type": "STORE", "channel": "TESTS_FORGET", "confirmed": False, "item_type": "inline", - "size": 202, "time": 1646123806, "item_content": '{"address": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "time": 1651757380.8522494, "item_type": "storage", "item_hash": "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3", "size": 220916, "content_type": "file"}', "item_hash": "50635384e43c7af6b3297f6571644c30f3f07ac681bfd14b9c556c63e661a69e", "signature": "0x71263de6b8d1ea4c0b028f5892287505f6ee73dfa165d1455ca665ffdf5318955345c193a5df2f5c4eb2185947689d7bf5be36155b00711572fec5f27764625c1b", } - message_user2 = { + message_user2_dict = { "chain": "ETH", "sender": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", "type": "STORE", "channel": "TESTS_FORGET", "confirmed": False, "item_type": "inline", - "size": 202, "time": 1646123806, "item_content": '{"address": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", "time": 1651757416.2203836, "item_type": "storage", "item_hash": "05a123fe17aa6addeef5a97d1665878d10f076d84309d5ae674d4bb292b484c3", "size": 220916, "content_type": "file"}', "item_hash": "dbe8199004b052108ec19618f43af1d2baf5c04974d0aec1c4de2d02c44a2483", "signature": "0x4c9ef501e1e4f4b0a05c1eebfa1063837a82788f80deeb59808d25ff481c855157dd65102eaa365e33c7572a78d551cf25075f49d00ebb60c8506c0a6647ab761b", } - forget_message_user1 = { + forget_message_user1_dict = { "chain": "ETH", "sender": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "type": "FORGET", "channel": "TESTS_FORGET", "item_type": "inline", - "size": 202, "time": 1651757583.497435, "item_content": '{"address": "0x971300C78A38e0F85E60A3b04ae3fA70b4276B64", "time": 1651757583.4974332, "hashes": ["50635384e43c7af6b3297f6571644c30f3f07ac681bfd14b9c556c63e661a69e"], "reason": "I do not like this file"}', "item_hash": "0223e74dbae53b45da6a443fa18fd2a25f88677c82ed2de93f17ab24f78f58cf", @@ -71,35 +69,38 @@ async def test_forget_multiusers_storage(mocker, test_db): file_content = f.read() await store_gridfs_file(key=file_hash, value=file_content) + message_user1 = parse_message(message_user1_dict) await process_one_message(message_user1) message1_db = await Message.collection.find_one( - {"item_hash": message_user1["item_hash"]} + {"item_hash": message_user1.item_hash} ) assert message1_db is not None + message_user2 = parse_message(message_user2_dict) await process_one_message(message_user2) # Sanity check: check that the file exists db_file_data = await read_gridfs_file(file_hash) assert db_file_data == file_content + forget_message_user1 = parse_message(forget_message_user1_dict) await process_one_message(forget_message_user1) # Check that the message was properly forgotten forgotten_message = await Message.collection.find_one( - {"item_hash": message_user1["item_hash"]} + {"item_hash": message_user1.item_hash} ) assert forgotten_message is not None - assert forgotten_message["forgotten_by"] == [forget_message_user1["item_hash"]] + assert forgotten_message["forgotten_by"] == [forget_message_user1.item_hash] # Check that the message from user 2 is not affected message_user2_db = await Message.collection.find_one( - {"item_hash": message_user2["item_hash"]} + {"item_hash": message_user2.item_hash} ) assert message_user2_db is not None assert "forgotten_by" not in message_user2_db - assert message_user2_db["item_content"] == message_user2["item_content"] + assert message_user2_db["item_content"] == message_user2.item_content # Check that the file still exists db_file_data = await read_gridfs_file(file_hash) diff --git a/tests/storage/test_get_content.py b/tests/storage/test_get_content.py index 18678cec4..8d94ef72e 100644 --- a/tests/storage/test_get_content.py +++ b/tests/storage/test_get_content.py @@ -3,6 +3,7 @@ import pytest from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable +from aleph.schemas.pending_messages import parse_message from aleph.storage import ContentSource, get_hash_content, get_json, get_message_content from aleph_message.models import ItemType @@ -130,33 +131,13 @@ async def test_get_invalid_json(mocker, mock_config): _content = await get_json("1234") -@pytest.mark.asyncio -async def test_get_inline_content(mock_config): - content_hash = "message-hash" - json_content = [ - {"post": "The joys of JavaScript (JK)"}, - {"post": "Does your cat plan to murder you?"}, - ] - json_bytes = json.dumps(json_content).encode("utf-8") - message = { - "item_type": ItemType.inline.value, - "item_hash": content_hash, - "item_content": json_bytes, - } - - content = await get_message_content(message) - assert content.value == json_content - assert content.hash == content_hash - assert content.raw_value == json_bytes - - @pytest.mark.asyncio async def test_get_inline_content_full_message(): """ - Same test as above, with a complete message. Reuse of an older test. + Get inline content from a message. Reuses an older test/fixture. """ - msg = { + message_dict = { "chain": "NULS", "channel": "SYSINFO", "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", @@ -167,47 +148,37 @@ async def test_get_inline_content_full_message(): "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", "item_type": "inline", } - content = await get_message_content(msg) + + message = parse_message(message_dict) + content = await get_message_content(message) item_content = content.value - print(item_content) - assert len(content.raw_value) == len(msg['item_content']) - assert item_content['key'] == 'metrics' - assert item_content['address'] == 'TTapAav8g3fFjxQQCjwPd4ERPnai9oya' - assert 'memory' in item_content['content'] - assert 'cpu_cores' in item_content['content'] + + assert len(content.raw_value) == len(message.item_content) + assert item_content["key"] == "metrics" + assert item_content["address"] == "TTapAav8g3fFjxQQCjwPd4ERPnai9oya" + assert "memory" in item_content["content"] + assert "cpu_cores" in item_content["content"] @pytest.mark.asyncio async def test_get_stored_message_content(mocker, mock_config): - content_hash = "message-hash" + message_dict = { + "chain": "ETH", + "channel": "TEST", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "POST", + "item_type": "storage", + "item_hash": "315f7313eb97d2c8299e3ee9c19d81f226c44ccf81c387c9fb25c54fced245f5", + "item_content": None, + "signature": "unsigned fixture, deal with it", + "time": 1652805847.190618, + } json_content = {"I": "Inter", "P": "Planetary", "F": "File", "S": "System"} json_bytes = json.dumps(json_content).encode("utf-8") mocker.patch("aleph.storage.get_value", return_value=json_bytes) - message = { - "item_type": ItemType.ipfs.value, - "item_hash": content_hash, - } + message = parse_message(message_dict) content = await get_message_content(message) assert content.value == json_content - assert content.hash == content_hash - - -@pytest.mark.asyncio -async def test_get_message_content_unknown_item_type(mocker, mock_config): - """ - Checks that an unknown item type field in a message will mark it as currently unavailable. - """ - - json_content = {"No more": "inspiration"} - json_bytes = json.dumps(json_content).encode("utf-8") - mocker.patch("aleph.storage.get_value", return_value=json_bytes) - - message = { - "item_type": "invalid-item-type", - "item_hash": "message_hash", - } - - with pytest.raises(ContentCurrentlyUnavailable): - _content = await get_message_content(message) + assert content.hash == message.item_hash diff --git a/tests/storage/test_store_message.py b/tests/storage/test_store_message.py index b87f3580c..add7c68f3 100644 --- a/tests/storage/test_store_message.py +++ b/tests/storage/test_store_message.py @@ -1,14 +1,16 @@ +import json + import pytest +from aleph.exceptions import UnknownHashError from aleph.handlers.storage import handle_new_storage +from aleph.schemas.pending_messages import parse_message, PendingStoreMessage from aleph.storage import ContentSource, RawContent -import json -from aleph.exceptions import UnknownHashError @pytest.fixture def fixture_message_file(): - return { + message_dict = { "_id": {"$oid": "621908cb378bcd3ef596fa50"}, "chain": "ETH", "item_hash": "7e4f914865028356704919810073ec5690ecc4bb0ee3bd6bdb24829fd532398f", @@ -29,11 +31,12 @@ def fixture_message_file(): } ], } + return parse_message(message_dict) @pytest.fixture def fixture_message_directory(): - return { + message_dict = { "_id": {"$oid": "1234"}, "chain": "ETH", "item_hash": "b3d17833bcefb7a6eb2d9fa7c77cca3eed3a3fa901a904d35c529a71be25fc6d", @@ -47,11 +50,12 @@ def fixture_message_directory(): "size": 158, "time": 1644409598.782, } + return parse_message(message_dict) @pytest.mark.asyncio async def test_handle_new_storage_invalid_content( - mock_config, fixture_message_directory + mock_config, fixture_message_directory: PendingStoreMessage ): missing_item_hash_content = { "address": "0x2278d6A697B2Be8aE4Ddf090f918d1642Ee43c8C", @@ -80,8 +84,10 @@ async def test_handle_new_storage_invalid_content( @pytest.mark.asyncio -async def test_handle_new_storage_file(mocker, mock_config, fixture_message_file): - content = json.loads(fixture_message_file["item_content"]) +async def test_handle_new_storage_file( + mocker, mock_config, fixture_message_file: PendingStoreMessage +): + content = json.loads(fixture_message_file.item_content) raw_content = RawContent( hash=content["item_hash"], @@ -115,7 +121,7 @@ async def test_handle_new_storage_file(mocker, mock_config, fixture_message_file @pytest.mark.asyncio async def test_handle_new_storage_directory( - mocker, mock_config, fixture_message_directory + mocker, mock_config, fixture_message_directory: PendingStoreMessage ): get_hash_content_mock = mocker.patch("aleph.handlers.storage.get_hash_content") mock_ipfs_api = mocker.MagicMock() @@ -129,7 +135,7 @@ async def test_handle_new_storage_directory( mock_ipfs_api.files.stat = mocker.AsyncMock(return_value=ipfs_stats) mocker.patch("aleph.handlers.storage.get_ipfs_api", return_value=mock_ipfs_api) - content = json.loads(fixture_message_directory["item_content"]) + content = json.loads(fixture_message_directory.item_content) result = await handle_new_storage(fixture_message_directory, content) assert result and result != -1 @@ -144,9 +150,9 @@ async def test_handle_new_storage_directory( @pytest.mark.asyncio async def test_handle_new_storage_invalid_hash( - mocker, mock_config, fixture_message_file + mock_config, fixture_message_file: PendingStoreMessage ): - content = json.loads(fixture_message_file["item_content"]) + content = json.loads(fixture_message_file.item_content) content["item_hash"] = "some-invalid-hash" with pytest.raises(UnknownHashError): diff --git a/tests/test_network.py b/tests/test_network.py index 4e28d1733..918fb3b8d 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -4,7 +4,8 @@ import aleph.chains from aleph.exceptions import InvalidMessageError from aleph.chains.common import IncomingStatus -from aleph.network import check_message +from aleph.network import verify_signature +from aleph.schemas.pending_messages import parse_message __author__ = "Moshe Malawach" __copyright__ = "Moshe Malawach" @@ -20,25 +21,28 @@ # assert msg['item_type'] == 'ipfs', "ipfs should be the default" # assert msg is passed_msg, "same object should be returned" + @pytest.mark.skip("TODO: NULS signature verification does not work with the fixture.") @pytest.mark.asyncio async def test_valid_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "NULS", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - message = await check_message(sample_message) - assert message is not None + + sample_message = parse_message(sample_message_dict) + await verify_signature(sample_message) @pytest.mark.asyncio async def test_invalid_chain_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", "item_type": "ipfs", "chain": "BAR", @@ -46,15 +50,16 @@ async def test_invalid_chain_message(): "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } + with pytest.raises(InvalidMessageError): - _ = await check_message(sample_message) + _ = parse_message(sample_message_dict) @pytest.mark.asyncio async def test_invalid_signature_message(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", "item_type": "ipfs", "chain": "NULS", @@ -62,86 +67,55 @@ async def test_invalid_signature_message(): "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "BAR" + "signature": "BAR", } + + sample_message = parse_message(sample_message_dict) with pytest.raises(InvalidMessageError): - _ = await check_message(sample_message) + _ = await verify_signature(sample_message) -@pytest.mark.skip("TODO: NULS signature verification does not fail as expected with this fixture.") @pytest.mark.asyncio async def test_invalid_signature_message_2(): - sample_message = { + sample_message_dict = { "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", + "item_type": "ipfs", "chain": "NULS", "channel": "SYSINFO", "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", "type": "AGGREGATE", "time": 1563279102.3155158, - "signature": "2153041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554525c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" + "signature": "2153041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554525c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - # with pytest.raises(InvalidMessageError): - x = await check_message(sample_message) - print(x) + sample_message = parse_message(sample_message_dict) + with pytest.raises(InvalidMessageError): + _ = await verify_signature(sample_message) -@pytest.mark.skip("TODO: NULS signature verification does not work with the fixture.") -@pytest.mark.asyncio -async def test_extraneous_fields(): - sample_message = { - "item_hash": "QmfDkHXdGND7e8uwJr4yvXSAvbPc8rothM6UN5ABQPsLkF", - "chain": "NULS", - "channel": "SYSINFO", - "sender": "TTanii7eCT93f45g2UpKH81mxpVNcCYw", - "type": "AGGREGATE", - "foo": "bar", - "time": 1563279102.3155158, - "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4" - } - message = await check_message(sample_message) - # assert "type" not in message - assert "foo" not in message -# @pytest.mark.asyncio -# async def test_inline_content(): -# content = json.dumps({'foo': 'bar'}) -# h = hashlib.sha256() -# h.update(content.encode('utf-8')) -# sample_message = { -# "item_hash": h.hexdigest(), -# "item_content": content, -# "chain": "NULS" -# } -# message = await check_message(sample_message, trusted=True) -# assert message is not None -# assert message['item_hash'] == h.hexdigest() -# assert message['item_content'] == content -# assert message['item_type'] == 'inline' - - -@pytest.mark.skip("TODO: NULS signature verification does not work with the fixtures.") @pytest.mark.asyncio async def test_incoming_inline_content(mocker): from aleph.chains.common import incoming from unittest.mock import MagicMock - + async def async_magic(): pass MagicMock.__await__ = lambda x: async_magic().__await__() - - mocker.patch('aleph.model.db') - - msg = {'chain': 'NULS', - 'channel': 'SYSINFO', - 'sender': 'TTapAav8g3fFjxQQCjwPd4ERPnai9oya', - 'type': 'AGGREGATE', - 'time': 1564581054.0532622, - 'item_content': '{"key":"metrics","address":"TTapAav8g3fFjxQQCjwPd4ERPnai9oya","content":{"memory":{"total":12578275328,"available":5726081024,"percent":54.5,"used":6503415808,"free":238661632,"active":8694841344,"inactive":2322239488,"buffers":846553088,"cached":4989644800,"shared":172527616,"slab":948609024},"swap":{"total":7787769856,"free":7787495424,"used":274432,"percent":0.0,"swapped_in":0,"swapped_out":16384},"cpu":{"user":9.0,"nice":0.0,"system":3.1,"idle":85.4,"iowait":0.0,"irq":0.0,"softirq":2.5,"steal":0.0,"guest":0.0,"guest_nice":0.0},"cpu_cores":[{"user":8.9,"nice":0.0,"system":2.4,"idle":82.2,"iowait":0.0,"irq":0.0,"softirq":6.4,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.6,"nice":0.0,"system":2.9,"idle":84.6,"iowait":0.0,"irq":0.0,"softirq":2.9,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":7.2,"nice":0.0,"system":3.0,"idle":86.8,"iowait":0.0,"irq":0.0,"softirq":3.0,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":3.0,"idle":84.8,"iowait":0.1,"irq":0.0,"softirq":0.7,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.3,"nice":0.0,"system":3.3,"idle":87.0,"iowait":0.1,"irq":0.0,"softirq":0.3,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":5.5,"nice":0.0,"system":4.4,"idle":89.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":8.7,"nice":0.0,"system":3.3,"idle":87.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":2.3,"idle":80.3,"iowait":0.0,"irq":0.0,"softirq":6.1,"steal":0.0,"guest":0.0,"guest_nice":0.0}]},"time":1564581054.0358574}', - 'item_hash': '84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c', - 'signature': '21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7' - } - # msg['item_type'] = 'inline' - msg = await check_message(msg) - status, ops = await incoming(msg) + + mocker.patch("aleph.model.db") + + message_dict = { + "chain": "NULS", + "channel": "SYSINFO", + "sender": "TTapAav8g3fFjxQQCjwPd4ERPnai9oya", + "type": "AGGREGATE", + "time": 1564581054.0532622, + "item_type": "inline", + "item_content": '{"key":"metrics","address":"TTapAav8g3fFjxQQCjwPd4ERPnai9oya","content":{"memory":{"total":12578275328,"available":5726081024,"percent":54.5,"used":6503415808,"free":238661632,"active":8694841344,"inactive":2322239488,"buffers":846553088,"cached":4989644800,"shared":172527616,"slab":948609024},"swap":{"total":7787769856,"free":7787495424,"used":274432,"percent":0.0,"swapped_in":0,"swapped_out":16384},"cpu":{"user":9.0,"nice":0.0,"system":3.1,"idle":85.4,"iowait":0.0,"irq":0.0,"softirq":2.5,"steal":0.0,"guest":0.0,"guest_nice":0.0},"cpu_cores":[{"user":8.9,"nice":0.0,"system":2.4,"idle":82.2,"iowait":0.0,"irq":0.0,"softirq":6.4,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.6,"nice":0.0,"system":2.9,"idle":84.6,"iowait":0.0,"irq":0.0,"softirq":2.9,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":7.2,"nice":0.0,"system":3.0,"idle":86.8,"iowait":0.0,"irq":0.0,"softirq":3.0,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":3.0,"idle":84.8,"iowait":0.1,"irq":0.0,"softirq":0.7,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":9.3,"nice":0.0,"system":3.3,"idle":87.0,"iowait":0.1,"irq":0.0,"softirq":0.3,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":5.5,"nice":0.0,"system":4.4,"idle":89.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":8.7,"nice":0.0,"system":3.3,"idle":87.9,"iowait":0.0,"irq":0.0,"softirq":0.1,"steal":0.0,"guest":0.0,"guest_nice":0.0},{"user":11.4,"nice":0.0,"system":2.3,"idle":80.3,"iowait":0.0,"irq":0.0,"softirq":6.1,"steal":0.0,"guest":0.0,"guest_nice":0.0}]},"time":1564581054.0358574}', + "item_hash": "84afd8484912d3fa11a402e480d17e949fbf600fcdedd69674253be0320fa62c", + "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", + } + message = parse_message(message_dict) + status, ops = await incoming(message) assert status == IncomingStatus.MESSAGE_HANDLED