From 6fa9a598aa09eb840fa178768f6f247847a882d4 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 9 May 2023 16:32:13 +0200 Subject: [PATCH] Fix: do not send confirmations to message websocket Problem: websocket users report messages appearing twice, as they are received on both the P2P and IPFS pubsub topics. Solution: ignore confirmations. The scaffolding was already in place, we just need to return a `ProcessedMessage` instead of a raw `MessageDb` from `MessageHandler.process()`. --- src/aleph/handlers/message_handler.py | 22 +++++-- src/aleph/jobs/job_utils.py | 58 +----------------- src/aleph/jobs/process_pending_messages.py | 10 ++-- src/aleph/types/message_processing_result.py | 59 +++++++++++++++++++ tests/helpers/message_test_helpers.py | 2 +- .../test_process_aggregates.py | 2 +- .../test_process_forgets.py | 2 +- 7 files changed, 88 insertions(+), 67 deletions(-) create mode 100644 src/aleph/types/message_processing_result.py diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 91d20d962..35b5a82a9 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -40,6 +40,7 @@ from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.db_session import DbSessionFactory, DbSession from aleph.types.files import FileType +from aleph.types.message_processing_result import ProcessedMessage from aleph.types.message_status import ( InvalidMessageException, InvalidSignature, @@ -283,7 +284,20 @@ async def verify_and_fetch( async def process( self, session: DbSession, pending_message: PendingMessageDb - ) -> MessageDb: + ) -> ProcessedMessage: + """ + Process a pending message. + + If the message is successfully processed, returns a handled message object + representing the processed message and some additional metadata. + Throws a MessageProcessingException if the message cannot be processed. + + :param session: DB session. + :param pending_message: Pending message to process. + :return: The processed message with some metadata indicating whether the message + is a new one or a confirmation. + """ + existing_message = get_message_by_item_hash( session=session, item_hash=pending_message.item_hash ) @@ -293,7 +307,7 @@ async def process( existing_message=existing_message, pending_message=pending_message, ) - return existing_message + return ProcessedMessage(message=existing_message, is_confirmation=True) message = await self.verify_and_fetch( session=session, pending_message=pending_message @@ -305,7 +319,7 @@ async def process( session=session, pending_message=pending_message, message=message ) await content_handler.process(session=session, messages=[message]) - return message + return ProcessedMessage(message=message, is_confirmation=False) async def check_permissions(self, session: DbSession, message: MessageDb): content_handler = self.get_content_handler(message.type) @@ -314,5 +328,5 @@ async def check_permissions(self, session: DbSession, message: MessageDb): # TODO: this method is only used in tests. Consider removing it. async def fetch_and_process_one_message_db(self, pending_message: PendingMessageDb): with self.session_factory() as session: - await self.process(session=session, pending_message=pending_message) + _ = await self.process(session=session, pending_message=pending_message) session.commit() diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index 235142e6d..5fe7ec953 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -1,7 +1,7 @@ import asyncio import datetime as dt import logging -from typing import Dict, Union, Protocol +from typing import Dict, Union from typing import Tuple from configmanager import Config @@ -10,73 +10,21 @@ import aleph.config from aleph.db.accessors.messages import reject_existing_pending_message from aleph.db.accessors.pending_messages import set_next_retry -from aleph.db.models import PendingMessageDb, MessageDb +from aleph.db.models import PendingMessageDb from aleph.handlers.message_handler import MessageHandler from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSession, DbSessionFactory +from aleph.types.message_processing_result import RejectedMessage, WillRetryMessage from aleph.types.message_status import ( ErrorCode, RetryMessageException, FileNotFoundException, InvalidMessageException, - MessageProcessingStatus, ) LOGGER = logging.getLogger(__name__) -class MessageProcessingResult(Protocol): - status: MessageProcessingStatus - - @property - def item_hash(self) -> str: - pass - - -class ProcessedMessage(MessageProcessingResult): - def __init__(self, message: MessageDb, is_confirmation: bool = False): - self.message = message - self.status = ( - MessageProcessingStatus.PROCESSED_CONFIRMATION - if is_confirmation - else MessageProcessingStatus.PROCESSED_NEW_MESSAGE - ) - - @property - def item_hash(self) -> str: - return self.message.item_hash - - -class FailedMessage(MessageProcessingResult): - status = MessageProcessingStatus.FAILED_WILL_RETRY - - def __init__( - self, pending_message: PendingMessageDb, error_code: ErrorCode, will_retry: bool - ): - self.pending_message = pending_message - self.error_code = error_code - - self.status = ( - MessageProcessingStatus.FAILED_WILL_RETRY - if will_retry - else MessageProcessingStatus.FAILED_REJECTED - ) - - @property - def item_hash(self) -> str: - return self.pending_message.item_hash - - -class WillRetryMessage(FailedMessage): - def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode): - super().__init__(pending_message, error_code, will_retry=True) - - -class RejectedMessage(FailedMessage): - def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode): - super().__init__(pending_message, error_code, will_retry=False) - - MAX_RETRY_INTERVAL: int = 300 diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 7b00dc7e5..5a0c22d95 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -29,10 +29,9 @@ from .job_utils import ( prepare_loop, MessageJob, - MessageProcessingResult, - ProcessedMessage, ) from ..services.cache.node_cache import NodeCache +from ..types.message_processing_result import MessageProcessingResult LOGGER = getLogger(__name__) @@ -99,11 +98,12 @@ async def process_messages( break try: - message = await self.message_handler.process( - session=session, pending_message=pending_message + result: MessageProcessingResult = ( + await self.message_handler.process( + session=session, pending_message=pending_message + ) ) session.commit() - result: MessageProcessingResult = ProcessedMessage(message) except Exception as e: session.rollback() diff --git a/src/aleph/types/message_processing_result.py b/src/aleph/types/message_processing_result.py new file mode 100644 index 000000000..4a9de0f0d --- /dev/null +++ b/src/aleph/types/message_processing_result.py @@ -0,0 +1,59 @@ +from typing import Protocol + +from aleph.db.models import PendingMessageDb, MessageDb +from aleph.types.message_status import ( + ErrorCode, + MessageProcessingStatus, +) + + +class MessageProcessingResult(Protocol): + status: MessageProcessingStatus + + @property + def item_hash(self) -> str: + pass + + +class ProcessedMessage(MessageProcessingResult): + def __init__(self, message: MessageDb, is_confirmation: bool = False): + self.message = message + self.status = ( + MessageProcessingStatus.PROCESSED_CONFIRMATION + if is_confirmation + else MessageProcessingStatus.PROCESSED_NEW_MESSAGE + ) + + @property + def item_hash(self) -> str: + return self.message.item_hash + + +class FailedMessage(MessageProcessingResult): + status = MessageProcessingStatus.FAILED_WILL_RETRY + + def __init__( + self, pending_message: PendingMessageDb, error_code: ErrorCode, will_retry: bool + ): + self.pending_message = pending_message + self.error_code = error_code + + self.status = ( + MessageProcessingStatus.FAILED_WILL_RETRY + if will_retry + else MessageProcessingStatus.FAILED_REJECTED + ) + + @property + def item_hash(self) -> str: + return self.pending_message.item_hash + + +class WillRetryMessage(FailedMessage): + def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode): + super().__init__(pending_message, error_code, will_retry=True) + + +class RejectedMessage(FailedMessage): + def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode): + super().__init__(pending_message, error_code, will_retry=False) diff --git a/tests/helpers/message_test_helpers.py b/tests/helpers/message_test_helpers.py index 745049f0b..0acdf4061 100644 --- a/tests/helpers/message_test_helpers.py +++ b/tests/helpers/message_test_helpers.py @@ -6,10 +6,10 @@ from aleph_message.models import ItemType, MessageConfirmation from aleph.db.models import MessageDb, PendingMessageDb, MessageStatusDb -from aleph.jobs.job_utils import MessageProcessingResult from aleph.jobs.process_pending_messages import PendingMessageProcessor from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSession +from aleph.types.message_processing_result import MessageProcessingResult from aleph.types.message_status import MessageStatus diff --git a/tests/message_processing/test_process_aggregates.py b/tests/message_processing/test_process_aggregates.py index 43f53183c..286aa078e 100644 --- a/tests/message_processing/test_process_aggregates.py +++ b/tests/message_processing/test_process_aggregates.py @@ -14,12 +14,12 @@ from aleph.db.models import PendingMessageDb, MessageDb, AggregateElementDb, AggregateDb from aleph.handlers.content.aggregate import AggregateMessageHandler from aleph.handlers.message_handler import MessageHandler -from aleph.jobs.job_utils import ProcessedMessage from aleph.jobs.process_pending_messages import PendingMessageProcessor from aleph.storage import StorageService from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.channel import Channel from aleph.types.db_session import DbSessionFactory, DbSession +from aleph.types.message_processing_result import ProcessedMessage from message_test_helpers import process_pending_messages diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index fa29290ce..6be4b8c4f 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -22,12 +22,12 @@ from aleph.handlers.content.post import PostMessageHandler from aleph.handlers.content.program import ProgramMessageHandler from aleph.handlers.content.store import StoreMessageHandler -from aleph.jobs.job_utils import ProcessedMessage, RejectedMessage from aleph.jobs.process_pending_messages import PendingMessageProcessor from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.channel import Channel from aleph.types.db_session import DbSessionFactory from aleph.types.files import FileType +from aleph.types.message_processing_result import ProcessedMessage, RejectedMessage from aleph.types.message_status import MessageStatus from message_test_helpers import ( process_pending_messages,