diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 76d36c931..9f665c4ea 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -379,11 +379,28 @@ async def insert_costs( insert_stmt = make_costs_upsert_query(costs) session.execute(insert_stmt) - async def verify_message(self, pending_message: PendingMessageDb) -> MessageDb: + async def verify_and_fetch_message( + self, session: DbSession, pending_message: PendingMessageDb + ) -> MessageDb: await self.verify_signature(pending_message=pending_message) validated_message = await self.fetch_pending_message( pending_message=pending_message ) + content_handler = self.get_content_handler(pending_message.type) + + # Check Permissions before the fetch + await content_handler.check_permissions( + session=session, message=validated_message + ) + + await content_handler.pre_check_balance( + session=session, message=validated_message + ) + + # Fetch related content like the IPFS associated file + await content_handler.fetch_related_content( + session=session, message=validated_message + ) return validated_message @@ -432,14 +449,10 @@ async def process( ) # First check the message content and verify it - message = await self.verify_message(pending_message=pending_message) - - # Do a balance pre-check to avoid saving related data + message = await self.verify_and_fetch_message( + pending_message=pending_message, session=session + ) content_handler = self.get_content_handler(message.type) - await content_handler.pre_check_balance(session=session, message=message) - - # Fetch related content like the IPFS associated file - await self.fetch_related_content(session=session, message=message) await content_handler.check_dependencies(session=session, message=message) await content_handler.check_permissions(session=session, message=message) diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 7da909c2f..ce85ab879 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -55,9 +55,10 @@ def __init__( async def fetch_pending_message(self, pending_message: PendingMessageDb): with self.session_factory() as session: try: - message = await self.message_handler.verify_message( - pending_message=pending_message + message = await self.message_handler.verify_and_fetch_message( + pending_message=pending_message, session=session ) + session.execute( make_pending_message_fetched_statement( pending_message, message.content diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index 6b1b3be5d..48e50c1f4 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -215,15 +215,15 @@ async def test_process_store_no_signature( ) session.commit() + file_hash = "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e" + file_content = b"Hello Aleph.im" + storage_service = StorageService( - storage_engine=MockStorageEngine( - files={ - "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e": b"Hello Aleph.im" - } - ), + storage_engine=MockStorageEngine(files={file_hash: file_content}), ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) + message_processor.message_handler.storage_service = storage_service storage_handler = message_processor.message_handler.content_handlers[ MessageType.store @@ -738,7 +738,6 @@ async def test_pre_check_balance_with_existing_costs( "aleph.storage.StorageService.get_hash_content", return_value=small_file_content, ): - # Process first message to add existing costs await message_handler.process( session=session, pending_message=fixture_ipfs_store_message diff --git a/tests/test_network.py b/tests/test_network.py index 29733ab9c..9b61f5370 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -120,6 +120,8 @@ async def test_incoming_inline_content( reception_time=dt.datetime(2022, 1, 1), fetched=True, ) - - message = await message_handler.verify_message(pending_message=pending_message) - assert message is not None + with session_factory() as session: + message = await message_handler.verify_and_fetch_message( + pending_message=pending_message, session=session + ) + assert message is not None