Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,28 @@ async def insert_costs(
insert_stmt = make_costs_upsert_query(costs)
session.execute(insert_stmt)

async def verify_message(self, pending_message: PendingMessageDb) -> MessageDb:
async def verify_and_fetch_message(
self, session: DbSession, pending_message: PendingMessageDb
) -> MessageDb:
await self.verify_signature(pending_message=pending_message)
validated_message = await self.fetch_pending_message(
pending_message=pending_message
)
content_handler = self.get_content_handler(pending_message.type)

# Check Permissions before the fetch
await content_handler.check_permissions(
session=session, message=validated_message
)

await content_handler.pre_check_balance(
session=session, message=validated_message
)

# Fetch related content like the IPFS associated file
await content_handler.fetch_related_content(
session=session, message=validated_message
)

return validated_message

Expand Down Expand Up @@ -432,14 +449,10 @@ async def process(
)

# First check the message content and verify it
message = await self.verify_message(pending_message=pending_message)

# Do a balance pre-check to avoid saving related data
message = await self.verify_and_fetch_message(
pending_message=pending_message, session=session
)
content_handler = self.get_content_handler(message.type)
await content_handler.pre_check_balance(session=session, message=message)

# Fetch related content like the IPFS associated file
await self.fetch_related_content(session=session, message=message)

await content_handler.check_dependencies(session=session, message=message)
await content_handler.check_permissions(session=session, message=message)
Expand Down
5 changes: 3 additions & 2 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def __init__(
async def fetch_pending_message(self, pending_message: PendingMessageDb):
with self.session_factory() as session:
try:
message = await self.message_handler.verify_message(
pending_message=pending_message
message = await self.message_handler.verify_and_fetch_message(
pending_message=pending_message, session=session
)

session.execute(
make_pending_message_fetched_statement(
pending_message, message.content
Expand Down
11 changes: 5 additions & 6 deletions tests/message_processing/test_process_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ async def test_process_store_no_signature(
)
session.commit()

file_hash = "c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e"
file_content = b"Hello Aleph.im"

storage_service = StorageService(
storage_engine=MockStorageEngine(
files={
"c25b0525bc308797d3e35763faf5c560f2974dab802cb4a734ae4e9d1040319e": b"Hello Aleph.im"
}
),
storage_engine=MockStorageEngine(files={file_hash: file_content}),
ipfs_service=mocker.AsyncMock(),
node_cache=mocker.AsyncMock(),
)

message_processor.message_handler.storage_service = storage_service
storage_handler = message_processor.message_handler.content_handlers[
MessageType.store
Expand Down Expand Up @@ -738,7 +738,6 @@ async def test_pre_check_balance_with_existing_costs(
"aleph.storage.StorageService.get_hash_content",
return_value=small_file_content,
):

# Process first message to add existing costs
await message_handler.process(
session=session, pending_message=fixture_ipfs_store_message
Expand Down
8 changes: 5 additions & 3 deletions tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ async def test_incoming_inline_content(
reception_time=dt.datetime(2022, 1, 1),
fetched=True,
)

message = await message_handler.verify_message(pending_message=pending_message)
assert message is not None
with session_factory() as session:
message = await message_handler.verify_and_fetch_message(
pending_message=pending_message, session=session
)
assert message is not None
Loading