From c648784d027748b49a087c7480ff681428b2e882 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 22 Oct 2025 20:18:47 +0200 Subject: [PATCH 1/5] Fix: messages & pre check balance should be done on fetch pipeline --- src/aleph/handlers/message_handler.py | 20 ++++++++++++-------- src/aleph/jobs/fetch_pending_messages.py | 7 ++++++- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 76d36c931..ef128adbe 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -379,12 +379,20 @@ async def insert_costs( insert_stmt = make_costs_upsert_query(costs) session.execute(insert_stmt) - async def verify_message(self, pending_message: PendingMessageDb) -> MessageDb: + async def verify_message( + self, session: DbSession, pending_message: PendingMessageDb + ) -> MessageDb: await self.verify_signature(pending_message=pending_message) validated_message = await self.fetch_pending_message( pending_message=pending_message ) + content_handler = self.get_content_handler(pending_message.type) + + await content_handler.pre_check_balance( + session=session, message=validated_message + ) + return validated_message async def process( @@ -432,14 +440,10 @@ async def process( ) # First check the message content and verify it - message = await self.verify_message(pending_message=pending_message) - - # Do a balance pre-check to avoid saving related data + message = await self.verify_message( + pending_message=pending_message, session=session + ) content_handler = self.get_content_handler(message.type) - await content_handler.pre_check_balance(session=session, message=message) - - # Fetch related content like the IPFS associated file - await self.fetch_related_content(session=session, message=message) await content_handler.check_dependencies(session=session, message=message) await content_handler.check_permissions(session=session, message=message) diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 7da909c2f..302400fca 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -56,8 +56,13 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb): with self.session_factory() as session: try: message = await self.message_handler.verify_message( - pending_message=pending_message + pending_message=pending_message, session=session ) + # Fetch related content like the IPFS associated file + await self.message_handler.fetch_related_content( + session=session, message=message + ) + session.execute( make_pending_message_fetched_statement( pending_message, message.content From 8d1fedca2f9f34649bb7edb326fce82f32ba5fa4 Mon Sep 17 00:00:00 2001 From: 1yam Date: Wed, 22 Oct 2025 20:22:15 +0200 Subject: [PATCH 2/5] Fix: unit test missing file in db due to fetching adding the pin in db instead of processing --- .../test_process_forgets.py | 27 +++++++++- .../message_processing/test_process_stores.py | 50 ++++++++++++++++--- tests/test_network.py | 8 +-- 3 files changed, 75 insertions(+), 10 deletions(-) diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index 64e92cbff..fbbe79ada 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -7,7 +7,7 @@ from more_itertools import one from sqlalchemy import select -from aleph.db.accessors.files import count_file_pins, get_file +from aleph.db.accessors.files import count_file_pins, get_file, upsert_file from aleph.db.accessors.messages import get_forgotten_message, get_message_status from aleph.db.accessors.posts import get_post from aleph.db.models import ( @@ -203,6 +203,16 @@ async def test_forget_store_message( ) with session_factory() as session: + # Add the file to the database before processing + upsert_file( + session=session, + file_hash=file_hash, + size=4, + file_type=FileType.FILE, + ) + session.commit() + + # Process the message target_message_result = one( await process_pending_messages( message_processor=message_processor, @@ -566,6 +576,21 @@ async def test_forget_store_message_dependent( ) with session_factory() as session: + # Add the file to the database before processing + upsert_file( + session=session, + file_hash=file_hash, + size=4, + file_type=FileType.FILE, + ) + + upsert_file( + session=session, + file_hash=runtime_hash, + size=7, + file_type=FileType.FILE, + ) + session.commit() target_message_result = one( await process_pending_messages( message_processor=message_processor, diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index 6b1b3be5d..45911403b 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -8,7 +8,7 @@ from aleph_message.models import Chain, ItemHash, ItemType, MessageType, StoreContent from configmanager import Config -from aleph.db.accessors.files import get_message_file_pin +from aleph.db.accessors.files import get_message_file_pin, upsert_file from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import AlephBalanceDb, MessageDb, MessageStatusDb, PendingMessageDb from aleph.handlers.content.store import StoreMessageHandler @@ -24,6 +24,7 @@ from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.channel import Channel from aleph.types.db_session import DbSessionFactory +from aleph.types.files import FileType from aleph.types.message_status import InsufficientBalanceException, MessageStatus @@ -169,6 +170,10 @@ async def test_process_store( ) with session_factory() as session: + message = await message_handler.verify_message( + session=session, pending_message=fixture_store_message + ) + await message_handler.fetch_related_content(session=session, message=message) await message_handler.process( session=session, pending_message=fixture_store_message ) @@ -215,15 +220,24 @@ async def test_process_store_no_signature( ) session.commit() + file_hash = "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e" + file_content = b"Hello Aleph.im" + storage_service = StorageService( - storage_engine=MockStorageEngine( - files={ - "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e": b"Hello Aleph.im" - } - ), + storage_engine=MockStorageEngine(files={file_hash: file_content}), ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) + + with session_factory() as session: + # Add the file to the database before processing + upsert_file( + session=session, + file_hash=file_hash, + size=len(file_content), + file_type=FileType.FILE, + ) + session.commit() message_processor.message_handler.storage_service = storage_service storage_handler = message_processor.message_handler.content_handlers[ MessageType.store @@ -236,6 +250,14 @@ async def test_process_store_no_signature( _ = [message async for message in pipeline] with session_factory() as session: + # Ensure content is here + message = await message_processor.message_handler.verify_message( + session=session, pending_message=fixture_store_message + ) + await message_processor.message_handler.fetch_related_content( + session=session, message=message + ) + message_db = get_message_by_item_hash( session=session, item_hash=ItemHash(fixture_store_message.item_hash) ) @@ -281,6 +303,10 @@ async def test_process_store_with_not_enough_balance( with session_factory() as session: # NOTE: Account balance is 0 at this point + message = await message_handler.verify_message( + session=session, pending_message=fixture_store_message_with_cost + ) + await message_handler.fetch_related_content(session=session, message=message) with pytest.raises(InsufficientBalanceException): await message_handler.process( session=session, pending_message=fixture_store_message_with_cost @@ -323,6 +349,10 @@ async def test_process_store_small_file_no_balance_required( with session_factory() as session: # NOTE: Account balance is 0 at this point, but since the file is small # it should still be processed + message = await message_handler.verify_message( + session=session, pending_message=fixture_store_message_with_cost + ) + await message_handler.fetch_related_content(session=session, message=message) await message_handler.process( session=session, pending_message=fixture_store_message_with_cost ) @@ -739,6 +769,14 @@ async def test_pre_check_balance_with_existing_costs( return_value=small_file_content, ): + # Those are fetch from the fetch pipeline so we need them + message_fetch = await message_handler.verify_message( + session=session, pending_message=fixture_ipfs_store_message + ) + await message_handler.fetch_related_content( + session=session, message=message_fetch + ) + # Process first message to add existing costs await message_handler.process( session=session, pending_message=fixture_ipfs_store_message diff --git a/tests/test_network.py b/tests/test_network.py index 29733ab9c..d6a522a19 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -120,6 +120,8 @@ async def test_incoming_inline_content( reception_time=dt.datetime(2022, 1, 1), fetched=True, ) - - message = await message_handler.verify_message(pending_message=pending_message) - assert message is not None + with session_factory() as session: + message = await message_handler.verify_message( + pending_message=pending_message, session=session + ) + assert message is not None From a9c655dea54467472d895a6cd484354ca25157ac Mon Sep 17 00:00:00 2001 From: 1yam Date: Thu, 23 Oct 2025 11:51:20 +0200 Subject: [PATCH 3/5] Refactor: `verify_message` renamed to `verify_and_fetch_message`. now fetch related content & verify permissions --- src/aleph/handlers/message_handler.py | 15 ++++++++++++--- src/aleph/jobs/fetch_pending_messages.py | 6 +----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index ef128adbe..9f665c4ea 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -379,20 +379,29 @@ async def insert_costs( insert_stmt = make_costs_upsert_query(costs) session.execute(insert_stmt) - async def verify_message( + async def verify_and_fetch_message( self, session: DbSession, pending_message: PendingMessageDb ) -> MessageDb: await self.verify_signature(pending_message=pending_message) validated_message = await self.fetch_pending_message( pending_message=pending_message ) - content_handler = self.get_content_handler(pending_message.type) + # Check Permissions before the fetch + await content_handler.check_permissions( + session=session, message=validated_message + ) + await content_handler.pre_check_balance( session=session, message=validated_message ) + # Fetch related content like the IPFS associated file + await content_handler.fetch_related_content( + session=session, message=validated_message + ) + return validated_message async def process( @@ -440,7 +449,7 @@ async def process( ) # First check the message content and verify it - message = await self.verify_message( + message = await self.verify_and_fetch_message( pending_message=pending_message, session=session ) content_handler = self.get_content_handler(message.type) diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 302400fca..ce85ab879 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -55,13 +55,9 @@ def __init__( async def fetch_pending_message(self, pending_message: PendingMessageDb): with self.session_factory() as session: try: - message = await self.message_handler.verify_message( + message = await self.message_handler.verify_and_fetch_message( pending_message=pending_message, session=session ) - # Fetch related content like the IPFS associated file - await self.message_handler.fetch_related_content( - session=session, message=message - ) session.execute( make_pending_message_fetched_statement( From d03625375f59ea89ef11d2aa01679e6ba2fdf365 Mon Sep 17 00:00:00 2001 From: 1yam Date: Thu, 23 Oct 2025 11:54:29 +0200 Subject: [PATCH 4/5] fix: processing pipeline should still fetch the file if the fetch pipeline didn't --- .../test_process_forgets.py | 26 +------------ .../message_processing/test_process_stores.py | 39 +------------------ tests/test_network.py | 2 +- 3 files changed, 3 insertions(+), 64 deletions(-) diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index fbbe79ada..419dfecb5 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -7,7 +7,7 @@ from more_itertools import one from sqlalchemy import select -from aleph.db.accessors.files import count_file_pins, get_file, upsert_file +from aleph.db.accessors.files import count_file_pins, get_file from aleph.db.accessors.messages import get_forgotten_message, get_message_status from aleph.db.accessors.posts import get_post from aleph.db.models import ( @@ -203,15 +203,6 @@ async def test_forget_store_message( ) with session_factory() as session: - # Add the file to the database before processing - upsert_file( - session=session, - file_hash=file_hash, - size=4, - file_type=FileType.FILE, - ) - session.commit() - # Process the message target_message_result = one( await process_pending_messages( @@ -576,21 +567,6 @@ async def test_forget_store_message_dependent( ) with session_factory() as session: - # Add the file to the database before processing - upsert_file( - session=session, - file_hash=file_hash, - size=4, - file_type=FileType.FILE, - ) - - upsert_file( - session=session, - file_hash=runtime_hash, - size=7, - file_type=FileType.FILE, - ) - session.commit() target_message_result = one( await process_pending_messages( message_processor=message_processor, diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index 45911403b..176dbea63 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -8,7 +8,7 @@ from aleph_message.models import Chain, ItemHash, ItemType, MessageType, StoreContent from configmanager import Config -from aleph.db.accessors.files import get_message_file_pin, upsert_file +from aleph.db.accessors.files import get_message_file_pin from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import AlephBalanceDb, MessageDb, MessageStatusDb, PendingMessageDb from aleph.handlers.content.store import StoreMessageHandler @@ -24,7 +24,6 @@ from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.channel import Channel from aleph.types.db_session import DbSessionFactory -from aleph.types.files import FileType from aleph.types.message_status import InsufficientBalanceException, MessageStatus @@ -170,10 +169,6 @@ async def test_process_store( ) with session_factory() as session: - message = await message_handler.verify_message( - session=session, pending_message=fixture_store_message - ) - await message_handler.fetch_related_content(session=session, message=message) await message_handler.process( session=session, pending_message=fixture_store_message ) @@ -230,13 +225,6 @@ async def test_process_store_no_signature( ) with session_factory() as session: - # Add the file to the database before processing - upsert_file( - session=session, - file_hash=file_hash, - size=len(file_content), - file_type=FileType.FILE, - ) session.commit() message_processor.message_handler.storage_service = storage_service storage_handler = message_processor.message_handler.content_handlers[ @@ -250,14 +238,6 @@ async def test_process_store_no_signature( _ = [message async for message in pipeline] with session_factory() as session: - # Ensure content is here - message = await message_processor.message_handler.verify_message( - session=session, pending_message=fixture_store_message - ) - await message_processor.message_handler.fetch_related_content( - session=session, message=message - ) - message_db = get_message_by_item_hash( session=session, item_hash=ItemHash(fixture_store_message.item_hash) ) @@ -303,10 +283,6 @@ async def test_process_store_with_not_enough_balance( with session_factory() as session: # NOTE: Account balance is 0 at this point - message = await message_handler.verify_message( - session=session, pending_message=fixture_store_message_with_cost - ) - await message_handler.fetch_related_content(session=session, message=message) with pytest.raises(InsufficientBalanceException): await message_handler.process( session=session, pending_message=fixture_store_message_with_cost @@ -348,11 +324,6 @@ async def test_process_store_small_file_no_balance_required( with session_factory() as session: # NOTE: Account balance is 0 at this point, but since the file is small - # it should still be processed - message = await message_handler.verify_message( - session=session, pending_message=fixture_store_message_with_cost - ) - await message_handler.fetch_related_content(session=session, message=message) await message_handler.process( session=session, pending_message=fixture_store_message_with_cost ) @@ -769,14 +740,6 @@ async def test_pre_check_balance_with_existing_costs( return_value=small_file_content, ): - # Those are fetch from the fetch pipeline so we need them - message_fetch = await message_handler.verify_message( - session=session, pending_message=fixture_ipfs_store_message - ) - await message_handler.fetch_related_content( - session=session, message=message_fetch - ) - # Process first message to add existing costs await message_handler.process( session=session, pending_message=fixture_ipfs_store_message diff --git a/tests/test_network.py b/tests/test_network.py index d6a522a19..9b61f5370 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -121,7 +121,7 @@ async def test_incoming_inline_content( fetched=True, ) with session_factory() as session: - message = await message_handler.verify_message( + message = await message_handler.verify_and_fetch_message( pending_message=pending_message, session=session ) assert message is not None From 74819cdeef7903fd58234e80c63601359d921972 Mon Sep 17 00:00:00 2001 From: 1yam Date: Fri, 24 Oct 2025 10:59:27 +0200 Subject: [PATCH 5/5] fix: remove useless change from test --- tests/message_processing/test_process_forgets.py | 1 - tests/message_processing/test_process_stores.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index 419dfecb5..64e92cbff 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -203,7 +203,6 @@ async def test_forget_store_message( ) with session_factory() as session: - # Process the message target_message_result = one( await process_pending_messages( message_processor=message_processor, diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index 176dbea63..48e50c1f4 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -224,8 +224,6 @@ async def test_process_store_no_signature( node_cache=mocker.AsyncMock(), ) - with session_factory() as session: - session.commit() message_processor.message_handler.storage_service = storage_service storage_handler = message_processor.message_handler.content_handlers[ MessageType.store @@ -324,6 +322,7 @@ async def test_process_store_small_file_no_balance_required( with session_factory() as session: # NOTE: Account balance is 0 at this point, but since the file is small + # it should still be processed await message_handler.process( session=session, pending_message=fixture_store_message_with_cost ) @@ -739,7 +738,6 @@ async def test_pre_check_balance_with_existing_costs( "aleph.storage.StorageService.get_hash_content", return_value=small_file_content, ): - # Process first message to add existing costs await message_handler.process( session=session, pending_message=fixture_ipfs_store_message