Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSessionFactory, DbSession
from aleph.types.files import FileType
from aleph.types.message_processing_result import ProcessedMessage
from aleph.types.message_status import (
InvalidMessageException,
InvalidSignature,
Expand Down Expand Up @@ -283,7 +284,20 @@ async def verify_and_fetch(

async def process(
self, session: DbSession, pending_message: PendingMessageDb
) -> MessageDb:
) -> ProcessedMessage:
"""
Process a pending message.

If the message is successfully processed, returns a handled message object
representing the processed message and some additional metadata.
Throws a MessageProcessingException if the message cannot be processed.

:param session: DB session.
:param pending_message: Pending message to process.
:return: The processed message with some metadata indicating whether the message
is a new one or a confirmation.
"""

existing_message = get_message_by_item_hash(
session=session, item_hash=pending_message.item_hash
)
Expand All @@ -293,7 +307,7 @@ async def process(
existing_message=existing_message,
pending_message=pending_message,
)
return existing_message
return ProcessedMessage(message=existing_message, is_confirmation=True)

message = await self.verify_and_fetch(
session=session, pending_message=pending_message
Expand All @@ -305,7 +319,7 @@ async def process(
session=session, pending_message=pending_message, message=message
)
await content_handler.process(session=session, messages=[message])
return message
return ProcessedMessage(message=message, is_confirmation=False)

async def check_permissions(self, session: DbSession, message: MessageDb):
content_handler = self.get_content_handler(message.type)
Expand All @@ -314,5 +328,5 @@ async def check_permissions(self, session: DbSession, message: MessageDb):
# TODO: this method is only used in tests. Consider removing it.
async def fetch_and_process_one_message_db(self, pending_message: PendingMessageDb):
with self.session_factory() as session:
await self.process(session=session, pending_message=pending_message)
_ = await self.process(session=session, pending_message=pending_message)
session.commit()
58 changes: 3 additions & 55 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import datetime as dt
import logging
from typing import Dict, Union, Protocol
from typing import Dict, Union
from typing import Tuple

from configmanager import Config
Expand All @@ -10,73 +10,21 @@
import aleph.config
from aleph.db.accessors.messages import reject_existing_pending_message
from aleph.db.accessors.pending_messages import set_next_retry
from aleph.db.models import PendingMessageDb, MessageDb
from aleph.db.models import PendingMessageDb
from aleph.handlers.message_handler import MessageHandler
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.message_processing_result import RejectedMessage, WillRetryMessage
from aleph.types.message_status import (
ErrorCode,
RetryMessageException,
FileNotFoundException,
InvalidMessageException,
MessageProcessingStatus,
)

LOGGER = logging.getLogger(__name__)


class MessageProcessingResult(Protocol):
status: MessageProcessingStatus

@property
def item_hash(self) -> str:
pass


class ProcessedMessage(MessageProcessingResult):
def __init__(self, message: MessageDb, is_confirmation: bool = False):
self.message = message
self.status = (
MessageProcessingStatus.PROCESSED_CONFIRMATION
if is_confirmation
else MessageProcessingStatus.PROCESSED_NEW_MESSAGE
)

@property
def item_hash(self) -> str:
return self.message.item_hash


class FailedMessage(MessageProcessingResult):
status = MessageProcessingStatus.FAILED_WILL_RETRY

def __init__(
self, pending_message: PendingMessageDb, error_code: ErrorCode, will_retry: bool
):
self.pending_message = pending_message
self.error_code = error_code

self.status = (
MessageProcessingStatus.FAILED_WILL_RETRY
if will_retry
else MessageProcessingStatus.FAILED_REJECTED
)

@property
def item_hash(self) -> str:
return self.pending_message.item_hash


class WillRetryMessage(FailedMessage):
def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode):
super().__init__(pending_message, error_code, will_retry=True)


class RejectedMessage(FailedMessage):
def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode):
super().__init__(pending_message, error_code, will_retry=False)


MAX_RETRY_INTERVAL: int = 300


Expand Down
10 changes: 5 additions & 5 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
from .job_utils import (
prepare_loop,
MessageJob,
MessageProcessingResult,
ProcessedMessage,
)
from ..services.cache.node_cache import NodeCache
from ..types.message_processing_result import MessageProcessingResult

LOGGER = getLogger(__name__)

Expand Down Expand Up @@ -99,11 +98,12 @@ async def process_messages(
break

try:
message = await self.message_handler.process(
session=session, pending_message=pending_message
result: MessageProcessingResult = (
await self.message_handler.process(
session=session, pending_message=pending_message
)
)
session.commit()
result: MessageProcessingResult = ProcessedMessage(message)

except Exception as e:
session.rollback()
Expand Down
59 changes: 59 additions & 0 deletions src/aleph/types/message_processing_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Protocol

from aleph.db.models import PendingMessageDb, MessageDb
from aleph.types.message_status import (
ErrorCode,
MessageProcessingStatus,
)


class MessageProcessingResult(Protocol):
status: MessageProcessingStatus

@property
def item_hash(self) -> str:
pass


class ProcessedMessage(MessageProcessingResult):
def __init__(self, message: MessageDb, is_confirmation: bool = False):
self.message = message
self.status = (
MessageProcessingStatus.PROCESSED_CONFIRMATION
if is_confirmation
else MessageProcessingStatus.PROCESSED_NEW_MESSAGE
)

@property
def item_hash(self) -> str:
return self.message.item_hash


class FailedMessage(MessageProcessingResult):
status = MessageProcessingStatus.FAILED_WILL_RETRY

def __init__(
self, pending_message: PendingMessageDb, error_code: ErrorCode, will_retry: bool
):
self.pending_message = pending_message
self.error_code = error_code

self.status = (
MessageProcessingStatus.FAILED_WILL_RETRY
if will_retry
else MessageProcessingStatus.FAILED_REJECTED
)

@property
def item_hash(self) -> str:
return self.pending_message.item_hash


class WillRetryMessage(FailedMessage):
def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode):
super().__init__(pending_message, error_code, will_retry=True)


class RejectedMessage(FailedMessage):
def __init__(self, pending_message: PendingMessageDb, error_code: ErrorCode):
super().__init__(pending_message, error_code, will_retry=False)
2 changes: 1 addition & 1 deletion tests/helpers/message_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from aleph_message.models import ItemType, MessageConfirmation

from aleph.db.models import MessageDb, PendingMessageDb, MessageStatusDb
from aleph.jobs.job_utils import MessageProcessingResult
from aleph.jobs.process_pending_messages import PendingMessageProcessor
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSession
from aleph.types.message_processing_result import MessageProcessingResult
from aleph.types.message_status import MessageStatus


Expand Down
2 changes: 1 addition & 1 deletion tests/message_processing/test_process_aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from aleph.db.models import PendingMessageDb, MessageDb, AggregateElementDb, AggregateDb
from aleph.handlers.content.aggregate import AggregateMessageHandler
from aleph.handlers.message_handler import MessageHandler
from aleph.jobs.job_utils import ProcessedMessage
from aleph.jobs.process_pending_messages import PendingMessageProcessor
from aleph.storage import StorageService
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.channel import Channel
from aleph.types.db_session import DbSessionFactory, DbSession
from aleph.types.message_processing_result import ProcessedMessage
from message_test_helpers import process_pending_messages


Expand Down
2 changes: 1 addition & 1 deletion tests/message_processing/test_process_forgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
from aleph.handlers.content.post import PostMessageHandler
from aleph.handlers.content.program import ProgramMessageHandler
from aleph.handlers.content.store import StoreMessageHandler
from aleph.jobs.job_utils import ProcessedMessage, RejectedMessage
from aleph.jobs.process_pending_messages import PendingMessageProcessor
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.channel import Channel
from aleph.types.db_session import DbSessionFactory
from aleph.types.files import FileType
from aleph.types.message_processing_result import ProcessedMessage, RejectedMessage
from aleph.types.message_status import MessageStatus
from message_test_helpers import (
process_pending_messages,
Expand Down