From d65eacfa0812929c77cdb2e9c34dde3c08b7072a Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 22 Mar 2022 14:07:59 +0100 Subject: [PATCH 1/2] [Processing] Simplify incoming message functions Improved the `incoming` function to simplify it and make it more generic. Removed the `bulk_operation` flag as we always wish for bulk DB operations. The function now returns a status + a list of DB operations. DB operations are a new class that contains a Mongo operation and the name of the collection to target. This avoids passing lists as arguments and modifying them in place. The loop to await individual message tasks now waits for all tasks to complete instead of failing on the first operation. This fixes an issue where, if an exception occurs soon enough in the batch, the output of still-running tasks could be lost. Added type hints. --- src/aleph/chains/common.py | 94 +++++++++++++--------------- src/aleph/jobs.py | 90 +++++++++++--------------- src/aleph/model/db_bulk_operation.py | 13 ++++ src/aleph/services/ipfs/pubsub.py | 6 +- tests/chains/test_common.py | 4 +- tests/test_network.py | 7 ++- 6 files changed, 101 insertions(+), 113 deletions(-) create mode 100644 src/aleph/model/db_bulk_operation.py diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 0b04f4e3c..b84a326e2 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -2,7 +2,7 @@ import json import logging from enum import IntEnum -from typing import Dict, Optional, Union, Tuple, List +from typing import Dict, Optional, Tuple, List from aleph_message.models import MessageType from bson import ObjectId @@ -10,8 +10,9 @@ from aleph.handlers.forget import handle_forget_message from aleph.handlers.storage import handle_new_storage +from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.filepin import PermanentPin -from aleph.model.messages import Message, CappedMessage +from aleph.model.messages import CappedMessage, Message from aleph.model.pending import PendingMessage, PendingTX from aleph.network import check_message as check_message_fn from aleph.permissions import check_sender_authorization @@ -26,7 +27,7 @@ LOGGER = logging.getLogger("chains.common") -async def get_verification_buffer(message): +async def get_verification_buffer(message: Dict) -> bytes: """Returns a serialized string to verify the message integrity (this is was it signed) """ @@ -98,33 +99,32 @@ async def mark_message_for_retry( async def incoming( - message, + message: Dict, chain_name: Optional[str] = None, tx_hash: Optional[str] = None, height: Optional[int] = None, seen_ids: Optional[Dict[Tuple, int]] = None, check_message: bool = False, retrying: bool = False, - bulk_operation: bool = False, existing_id: Optional[ObjectId] = None, -) -> Union[IncomingStatus, UpdateOne]: +) -> Tuple[IncomingStatus, List[DbBulkOperation]]: """New incoming message from underlying chain. For regular messages it will be marked as confirmed if existing in database, created if not. """ - hash = message["item_hash"] + item_hash = message["item_hash"] sender = message["sender"] - ids_key = (hash, sender, chain_name) + ids_key = (item_hash, sender, chain_name) if chain_name and tx_hash and height and seen_ids is not None: if ids_key in seen_ids.keys(): if height > seen_ids[ids_key]: - return IncomingStatus.MESSAGE_HANDLED + return IncomingStatus.MESSAGE_HANDLED, [] filters = { - "item_hash": hash, + "item_hash": item_hash, "chain": message["chain"], "sender": message["sender"], "type": message["type"], @@ -142,12 +142,12 @@ async def incoming( ) if message is None: - return IncomingStatus.MESSAGE_HANDLED + return IncomingStatus.MESSAGE_HANDLED, [] if retrying: - LOGGER.debug("(Re)trying %s." % hash) + LOGGER.debug("(Re)trying %s." % item_hash) else: - LOGGER.info("Incoming %s." % hash) + LOGGER.info("Incoming %s." % item_hash) # we set the incoming chain as default for signature message["chain"] = message.get("chain", chain_name) @@ -182,7 +182,7 @@ async def incoming( if seen_ids is not None and height is not None: if ids_key in seen_ids.keys(): if height > seen_ids[ids_key]: - return IncomingStatus.MESSAGE_HANDLED + return IncomingStatus.MESSAGE_HANDLED, [] else: seen_ids[ids_key] = height else: @@ -194,7 +194,7 @@ async def incoming( # chain_name in [c['chain'] for c in existing['confirmations']]): # return - LOGGER.debug("Updating %s." % hash) + LOGGER.debug("Updating %s." % item_hash) if chain_name and tx_hash and height: # we need to update messages adding the confirmation @@ -209,12 +209,12 @@ async def incoming( content = await get_message_content(message) except InvalidContent: - LOGGER.warning("Can't get content of object %r, won't retry." % hash) - return IncomingStatus.FAILED_PERMANENTLY + LOGGER.warning("Can't get content of object %r, won't retry." % item_hash) + return IncomingStatus.FAILED_PERMANENTLY, [] except (ContentCurrentlyUnavailable, Exception) as e: if not isinstance(e, ContentCurrentlyUnavailable): - LOGGER.exception("Can't get content of object %r" % hash) + LOGGER.exception("Can't get content of object %r" % item_hash) await mark_message_for_retry( message=message, chain_name=chain_name, @@ -224,7 +224,7 @@ async def incoming( retrying=retrying, existing_id=existing_id, ) - return IncomingStatus.RETRYING_LATER + return IncomingStatus.RETRYING_LATER, [] json_content = content.value if json_content.get("address", None) is None: @@ -249,8 +249,8 @@ async def incoming( else: handling_result = True except UnknownHashError: - LOGGER.warning(f"Invalid IPFS hash for message {hash}, won't retry.") - return IncomingStatus.FAILED_PERMANENTLY + LOGGER.warning(f"Invalid IPFS hash for message {item_hash}, won't retry.") + return IncomingStatus.FAILED_PERMANENTLY, [] except Exception: LOGGER.exception("Error using the message type handler") handling_result = None @@ -266,29 +266,29 @@ async def incoming( retrying=retrying, existing_id=existing_id, ) - return IncomingStatus.RETRYING_LATER + return IncomingStatus.RETRYING_LATER, [] if not handling_result: LOGGER.warning( "Message type handler has failed permanently for " - "%r, won't retry." % hash + "%r, won't retry." % item_hash ) - return IncomingStatus.FAILED_PERMANENTLY + return IncomingStatus.FAILED_PERMANENTLY, [] if not await check_sender_authorization(message, json_content): - LOGGER.warning("Invalid sender for %s" % hash) - return IncomingStatus.MESSAGE_HANDLED + LOGGER.warning("Invalid sender for %s" % item_hash) + return IncomingStatus.MESSAGE_HANDLED, [] if seen_ids is not None and height is not None: if ids_key in seen_ids.keys(): if height > seen_ids[ids_key]: - return IncomingStatus.MESSAGE_HANDLED + return IncomingStatus.MESSAGE_HANDLED, [] else: seen_ids[ids_key] = height else: seen_ids[ids_key] = height - LOGGER.debug("New message to store for %s." % hash) + LOGGER.debug("New message to store for %s." % item_hash) # message.update(new_values) updates["$set"] = { "content": json_content, @@ -308,20 +308,23 @@ async def incoming( # await pin_hash(hash) if should_commit: - action = UpdateOne(filters, updates, upsert=True) - if not bulk_operation: - await Message.collection.bulk_write([action]) - await CappedMessage.collection.bulk_write([action]) - else: - return action - return IncomingStatus.MESSAGE_HANDLED - - -async def invalidate(chain_name, block_height): - """Invalidates a particular block height from an underlying chain - (in case of forks) + update_op = UpdateOne(filters, updates, upsert=True) + bulk_ops = [ + DbBulkOperation(CappedMessage, update_op), + DbBulkOperation(Message, update_op), + ] + return IncomingStatus.MESSAGE_HANDLED, bulk_ops + + return IncomingStatus.MESSAGE_HANDLED, [] + + +async def process_one_message(message: Dict): """ - pass + Helper function to process a message on the spot. + """ + status, ops = await incoming(message) + for op in ops: + await op.collection.collection.bulk_write([op.operation]) async def get_chaindata(messages, bulk_threshold=2000): @@ -409,12 +412,3 @@ async def incoming_chaindata(content, context): For now we only add it to the database, it will be processed later. """ await PendingTX.collection.insert_one({"content": content, "context": context}) - - -async def join_tasks(tasks, seen_ids): - try: - await asyncio.gather(*tasks) - except Exception: - LOGGER.exception("error in incoming task") - # seen_ids.clear() - tasks.clear() diff --git a/src/aleph/jobs.py b/src/aleph/jobs.py index 9b8f470ef..1b9aaaf96 100644 --- a/src/aleph/jobs.py +++ b/src/aleph/jobs.py @@ -1,25 +1,24 @@ import asyncio -import logging +from itertools import groupby from logging import getLogger from multiprocessing import Process from typing import Coroutine, List, Dict, Optional, Tuple import aioipfs import sentry_sdk -from pymongo import DeleteOne, InsertOne, DeleteMany, UpdateOne, ASCENDING -from pymongo.errors import CursorNotFound -from setproctitle import setproctitle - from aleph.chains.common import incoming, get_chaindata_messages, IncomingStatus -from aleph.model.messages import Message, CappedMessage +from aleph.exceptions import InvalidMessageError +from aleph.logging import setup_logging +from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.p2p import get_peers from aleph.model.pending import PendingMessage, PendingTX from aleph.network import check_message from aleph.services.ipfs.common import connect_ipfs_peer from aleph.services.p2p import singleton from aleph.types import ItemType -from aleph.exceptions import InvalidMessageError -from aleph.logging import setup_logging +from pymongo import DeleteOne, InsertOne, DeleteMany, ASCENDING +from pymongo.errors import CursorNotFound +from setproctitle import setproctitle LOGGER = getLogger("JOBS") @@ -27,10 +26,8 @@ async def handle_pending_message( pending: Dict, seen_ids: Dict[Tuple, int], - actions_list: List[DeleteOne], - messages_actions_list: List[UpdateOne], -): - result = await incoming( +) -> List[DbBulkOperation]: + status, operations = await incoming( pending["message"], chain_name=pending["source"].get("chain_name"), tx_hash=pending["source"].get("tx_hash"), @@ -38,37 +35,39 @@ async def handle_pending_message( seen_ids=seen_ids, check_message=pending["source"].get("check_message", True), retrying=True, - bulk_operation=True, existing_id=pending["_id"], ) - if result == IncomingStatus.RETRYING_LATER: - return + if status != IncomingStatus.RETRYING_LATER: + operations.append( + DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) + ) - if not isinstance(result, IncomingStatus): - assert isinstance(result, UpdateOne) - messages_actions_list.append(result) + return operations - actions_list.append(DeleteOne({"_id": pending["_id"]})) +async def join_pending_message_tasks(tasks): + db_operations = await asyncio.gather(*tasks, return_exceptions=True) -async def join_pending_message_tasks( - tasks, actions_list=None, messages_actions_list=None -): - try: - await asyncio.gather(*tasks, return_exceptions=True) - except Exception: - LOGGER.exception("error in incoming task") - tasks.clear() + errors = [op for op in db_operations if isinstance(op, BaseException)] + for error in errors: + LOGGER.error("Error while processing message: %s", error) + + db_operations = sorted( + ( + op + for operations in db_operations + if not isinstance(operations, BaseException) + for op in operations + ), + key=lambda op: op.collection.__name__, + ) - if messages_actions_list is not None and len(messages_actions_list): - await Message.collection.bulk_write(messages_actions_list) - await CappedMessage.collection.bulk_write(messages_actions_list) - messages_actions_list.clear() + for collection, operations in groupby(db_operations, lambda op: op.collection): + mongo_ops = [op.operation for op in operations] + await collection.collection.bulk_write(mongo_ops) - if actions_list is not None and len(actions_list): - await PendingMessage.collection.bulk_write(actions_list) - actions_list.clear() + tasks.clear() async def retry_messages_job(shared_stats: Dict): @@ -76,8 +75,6 @@ async def retry_messages_job(shared_stats: Dict): pending queue (Unavailable messages).""" seen_ids: Dict[Tuple, int] = dict() - actions: List[DeleteOne] = [] - messages_actions: List[UpdateOne] = [] gtasks: List[asyncio.Task] = [] tasks: List[asyncio.Task] = [] i: int = 0 @@ -96,8 +93,6 @@ async def retry_messages_job(shared_stats: Dict): shared_stats["retry_messages_job_seen_ids"] = len(seen_ids) shared_stats["retry_messages_job_gtasks"] = len(gtasks) shared_stats["retry_messages_job_tasks"] = len(tasks) - shared_stats["retry_messages_job_actions"] = len(actions) - shared_stats["retry_messages_job_messages_actions"] = len(messages_actions) shared_stats["retry_messages_job_i"] = i shared_stats["retry_messages_job_j"] = j @@ -123,11 +118,7 @@ async def retry_messages_job(shared_stats: Dict): i += 1 j += 1 - tasks.append( - asyncio.create_task( - handle_pending_message(pending, seen_ids, actions, messages_actions) - ) - ) + tasks.append(asyncio.create_task(handle_pending_message(pending, seen_ids))) if j >= 20000: # Group tasks using asyncio.gather in `gtasks`. @@ -135,30 +126,19 @@ async def retry_messages_job(shared_stats: Dict): asyncio.create_task( join_pending_message_tasks( tasks, - actions_list=actions, - messages_actions_list=messages_actions, ) ) ) tasks = [] - actions = [] - messages_actions = [] i = 0 j = 0 if i >= 1024: await join_pending_message_tasks(tasks) - # gtasks.append(asyncio.create_task(join_pending_message_tasks(tasks))) tasks = [] i = 0 - gtasks.append( - asyncio.create_task( - join_pending_message_tasks( - tasks, actions_list=actions, messages_actions_list=messages_actions - ) - ) - ) + gtasks.append(asyncio.create_task(join_pending_message_tasks(tasks))) await asyncio.gather(*gtasks, return_exceptions=True) gtasks = [] diff --git a/src/aleph/model/db_bulk_operation.py b/src/aleph/model/db_bulk_operation.py new file mode 100644 index 000000000..bec66478e --- /dev/null +++ b/src/aleph/model/db_bulk_operation.py @@ -0,0 +1,13 @@ +from .base import BaseClass +from typing import Type, Union +from dataclasses import dataclass +from pymongo import DeleteMany, DeleteOne, InsertOne, UpdateMany, UpdateOne + + +BulkOperation = Union[DeleteMany, DeleteOne, InsertOne, UpdateMany, UpdateOne] + + +@dataclass +class DbBulkOperation: + collection: Type[BaseClass] + operation: BulkOperation diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 3aa170631..a21b84f11 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -42,8 +42,9 @@ async def pub(topic: str, message: Union[str, bytes]): async def incoming_channel(topic) -> None: from aleph.network import incoming_check - from aleph.chains.common import incoming + from aleph.chains.common import process_one_message + # TODO: implement a check at startup # When using some deployment strategies such as docker-compose, # the IPFS service may not be ready by the time this function # is called. This variable define how many connection attempts @@ -51,12 +52,11 @@ async def incoming_channel(topic) -> None: trials_before_exception: int = 5 while True: try: - # seen_ids = [] async for mvalue in sub(topic): try: message = await incoming_check(mvalue) LOGGER.debug("New message %r" % message) - asyncio.create_task(incoming(message, bulk_operation=False)) + asyncio.create_task(process_one_message(message)) except InvalidMessageError: LOGGER.warning(f"Invalid message {mvalue}") diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index ee7cebdb9..478034eec 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -49,5 +49,5 @@ async def async_magic(): 'signature': '21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7' } msg['item_type'] = 'inline' - v = await incoming(msg, check_message=True) - assert v == IncomingStatus.MESSAGE_HANDLED + status, ops = await incoming(msg, check_message=True) + assert status == IncomingStatus.MESSAGE_HANDLED diff --git a/tests/test_network.py b/tests/test_network.py index 3b5c5844e..0cb1a52a6 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -2,8 +2,9 @@ # Mandatory import, otherwise VERIFIER_REGISTER is not populated. TODO: improve the registration system. import aleph.chains -from aleph.network import check_message from aleph.exceptions import InvalidMessageError +from aleph.chains.common import IncomingStatus +from aleph.network import check_message __author__ = "Moshe Malawach" __copyright__ = "Moshe Malawach" @@ -140,5 +141,5 @@ async def async_magic(): } # msg['item_type'] = 'inline' msg = await check_message(msg) - v = await incoming(msg) - assert v is True + status, ops = await incoming(msg) + assert status == IncomingStatus.MESSAGE_HANDLED From b15444050a190d23e5b51566aff99086983dbfec Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 26 Apr 2022 11:25:11 +0200 Subject: [PATCH 2/2] fixes for review --- CHANGELOG.rst | 6 ++++++ src/aleph/jobs.py | 1 + src/aleph/model/db_bulk_operation.py | 4 ++-- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e26662719..5cb662348 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,12 @@ Changelog ========= +Next release +============ + +- Breaking change: the "pyaleph_processing_pending_messages_action_total" and + "pyaleph_processing_pending_messages_action_total" values are no longer part of the metrics. + Version 0.2.1 ============= diff --git a/src/aleph/jobs.py b/src/aleph/jobs.py index 1b9aaaf96..f1027e47e 100644 --- a/src/aleph/jobs.py +++ b/src/aleph/jobs.py @@ -53,6 +53,7 @@ async def join_pending_message_tasks(tasks): for error in errors: LOGGER.error("Error while processing message: %s", error) + # Sort the operations by collection name before grouping and executing them. db_operations = sorted( ( op diff --git a/src/aleph/model/db_bulk_operation.py b/src/aleph/model/db_bulk_operation.py index bec66478e..8ab3dee0d 100644 --- a/src/aleph/model/db_bulk_operation.py +++ b/src/aleph/model/db_bulk_operation.py @@ -4,10 +4,10 @@ from pymongo import DeleteMany, DeleteOne, InsertOne, UpdateMany, UpdateOne -BulkOperation = Union[DeleteMany, DeleteOne, InsertOne, UpdateMany, UpdateOne] +MongoOperation = Union[DeleteMany, DeleteOne, InsertOne, UpdateMany, UpdateOne] @dataclass class DbBulkOperation: collection: Type[BaseClass] - operation: BulkOperation + operation: MongoOperation