From e8bc64e4046910ee8c9aaf8a1065adbeb22fd2be Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 26 Sep 2022 14:20:40 +0200 Subject: [PATCH 01/10] Feature: support Micheline-style signatures for Tezos (#330) Problem: web wallets do not allow signing raw messages. Instead, they require binary payloads in a specific format. Solution: support Micheline-style signatures, i.e. signatures supported by wallets like Beacon. Users can now use Micheline or raw signatures by specifying the `signature.signingType` field to "micheline" or "raw". By default, "raw" is assumed. Co-authored-by: Mike Hukiewitz <70762838+MHHukiewitz@users.noreply.github.com> --- src/aleph/chains/tezos.py | 81 +++++++++++++++++++++++++++++++++++++- tests/chains/test_tezos.py | 24 ++++++++++- 2 files changed, 102 insertions(+), 3 deletions(-) diff --git a/src/aleph/chains/tezos.py b/src/aleph/chains/tezos.py index 765f1ddc7..f061378c9 100644 --- a/src/aleph/chains/tezos.py +++ b/src/aleph/chains/tezos.py @@ -1,5 +1,7 @@ +import datetime as dt import json import logging +from enum import Enum from aleph_pytezos.crypto.key import Key @@ -10,13 +12,83 @@ LOGGER = logging.getLogger(__name__) CHAIN_NAME = "TEZOS" +# Default dApp URL for Micheline-style signatures +DEFAULT_DAPP_URL = "aleph.im" + + +class TezosSignatureType(str, Enum): + RAW = "raw" + MICHELINE = "micheline" + + +def timestamp_to_iso_8601(timestamp: float) -> str: + """ + Returns the timestamp formatted to ISO-8601, JS-style. + + Compared to the regular `isoformat()`, this function only provides precision down + to milliseconds and prints a "Z" instead of +0000 for UTC. + This format is typically used by JavaScript applications, like our TS SDK. + + Example: 2022-09-23T14:41:19.029Z + + :param timestamp: The timestamp to format. + :return: The formatted timestamp. + """ + + return ( + dt.datetime.utcfromtimestamp(timestamp).isoformat(timespec="milliseconds") + "Z" + ) + + +def micheline_verification_buffer( + verification_buffer: bytes, + timestamp: float, + dapp_url: str, +) -> bytes: + """ + Computes the verification buffer for Micheline-type signatures. + + This verification buffer is used when signing data with a Tezos web wallet. + See https://tezostaquito.io/docs/signing/#generating-a-signature-with-beacon-sdk. + + :param verification_buffer: The original (non-Tezos) verification buffer for the Aleph message. + :param timestamp: Timestamp of the message. + :param dapp_url: The URL of the dApp, for use as part of the verification buffer. + :return: The verification buffer used for the signature by the web wallet. + """ + + prefix = b"Tezos Signed Message:" + timestamp = timestamp_to_iso_8601(timestamp).encode("utf-8") + + payload = b" ".join( + (prefix, dapp_url.encode("utf-8"), timestamp, verification_buffer) + ) + hex_encoded_payload = payload.hex() + payload_size = str(len(hex_encoded_payload)).encode("utf-8") + + return b"\x05" + b"\x01\x00" + payload_size + payload + + +def get_tezos_verification_buffer( + message: BasePendingMessage, signature_type: TezosSignatureType, dapp_url: str +) -> bytes: + verification_buffer = get_verification_buffer(message) + + if signature_type == TezosSignatureType.RAW: + return verification_buffer + elif signature_type == TezosSignatureType.MICHELINE: + return micheline_verification_buffer( + verification_buffer, message.time, dapp_url + ) + + raise ValueError(f"Unsupported signature type: {signature_type}") + async def verify_signature(message: BasePendingMessage) -> bool: """ Verifies the cryptographic signature of a message signed with a Tezos key. """ - verification_buffer = get_verification_buffer(message) try: signature_dict = json.loads(message.signature) except json.JSONDecodeError: @@ -30,6 +102,9 @@ async def verify_signature(message: BasePendingMessage) -> bool: LOGGER.exception("'%s' key missing from Tezos signature dictionary.", e.args[0]) return False + signature_type = TezosSignatureType(signature_dict.get("signingType", "raw")) + dapp_url = signature_dict.get("dAppUrl", DEFAULT_DAPP_URL) + key = Key.from_encoded_key(public_key) # Check that the sender ID is equal to the public key hash public_key_hash = key.public_key_hash() @@ -41,6 +116,10 @@ async def verify_signature(message: BasePendingMessage) -> bool: public_key_hash, ) + verification_buffer = get_tezos_verification_buffer( + message, signature_type, dapp_url + ) + # Check the signature try: key.verify(signature, verification_buffer) diff --git a/tests/chains/test_tezos.py b/tests/chains/test_tezos.py index 4c6be4c37..14ca20402 100644 --- a/tests/chains/test_tezos.py +++ b/tests/chains/test_tezos.py @@ -2,10 +2,13 @@ from aleph.network import verify_signature from aleph.schemas.pending_messages import parse_message +from aleph.chains import ( + tezos, +) # TODO: this import is currently necessary because of circular dependencies @pytest.mark.asyncio -async def test_tezos_verify_signature(): +async def test_tezos_verify_signature_raw(): message_dict = { "chain": "TEZOS", "channel": "TEST", @@ -28,7 +31,7 @@ async def test_tezos_verify_signature(): @pytest.mark.asyncio -async def test_tezos_verify_signature_ed25519(): +async def test_tezos_verify_signature_raw_ed25519(): message_dict = { "chain": "TEZOS", "sender": "tz1SmGHzna3YhKropa3WudVq72jhTPDBn4r5", @@ -43,3 +46,20 @@ async def test_tezos_verify_signature_ed25519(): message = parse_message(message_dict) await verify_signature(message) + + +@pytest.mark.asyncio +async def test_tezos_verify_signature_micheline(): + message_dict = { + "chain": "TEZOS", + "sender": "tz1VrPqrVdMFsgykWyhGH7SYcQ9avHTjPcdD", + "type": "POST", + "channel": "ALEPH-TEST", + "signature": '{"signingType":"micheline","signature":"sigXD8iT5ivdawgPzE1AbtDwqqAjJhS5sHS1psyE74YjfiaQnxWZsATNjncdsuQw3b9xaK79krxtsC8uQoT5TcUXmo66aovT","publicKey":"edpkvapDnjnasrNcmUdMZXhQZwpX6viPyuGCq6nrP4W7ZJCm7EFTpS"}', + "time": 1663944079.029, + "item_type": "storage", + "item_hash": "72b2722b95582419cfa71f631ff6c6afc56344dc6a4609e772877621813040b7", + } + + message = parse_message(message_dict) + await verify_signature(message) From 2af24515d07940c2c495792412d36703c8f72620 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 4 Oct 2022 02:48:11 +0200 Subject: [PATCH 02/10] Fix: drop invalid pending messages (#329) Problem: if an invalid message somehow managed to reach the pending message collection, the message would be retried indefinitely logging exceptions on each run. Solution: drop invalid messages. --- src/aleph/jobs/process_pending_messages.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cb5aa69fd..dd35e445d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..exceptions import InvalidMessageError from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -49,7 +50,15 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: - message = parse_message(pending["message"]) + delete_pending_message_op = DbBulkOperation( + PendingMessage, DeleteOne({"_id": pending["_id"]}) + ) + + try: + message = parse_message(pending["message"]) + except InvalidMessageError: + # If an invalid message somehow ended in pending messages, drop it. + return [delete_pending_message_op] async with sem: status, operations = await incoming( @@ -64,9 +73,7 @@ async def handle_pending_message( ) if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) + operations.append(delete_pending_message_op) return operations From 54c592115b5cb2942db2a5b56113e4fe13153176 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 4 Oct 2022 17:08:47 +0200 Subject: [PATCH 03/10] Chore: fix conflicting dependencies for Ethereum Problem: some dependencies were missing from the requirements, and letting pip sort it out led to dependency conflicts. Solution: add the dependencies to setup.cfg. --- setup.cfg | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index 47a3d2519..db50ad63a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,7 +29,9 @@ package_dir = setup_requires = pyscaffold>=3.1a0,<3.2a0 pytest-runner>=2.0,<3dev -# Add here dependencies of your project (semicolon/line-separated), e.g. + +# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts. +# Update with care. Dependencies that were added to make it all work are annotated accordingly. install_requires = aiocache==0.11.1 aiohttp-cors==0.7.0 @@ -45,6 +47,8 @@ install_requires = configparser==5.2.0 cosmospy==6.0.0 dataclasses_json==0.5.6 + eciespy==0.3.11 # eth dependency + eth-hash==0.3.3 # eth dependency eth-keys==0.3.3 eth-rlp==0.2.1 eth_account==0.5.6 @@ -53,13 +57,14 @@ install_requires = msgpack==1.0.3 # required by aiocache nuls2-python@git+https://github.com/aleph-im/nuls2-python.git p2pclient==0.2.0 + protobuf==3.20.3 # eth dependency pymongo==3.12.3 pynacl==1.5.0 python-dateutil==2.8.2 python-socketio==5.5.1 pytz==2021.3 pyyaml==6.0 - requests==2.27.1 + requests==2.28.1 secp256k1==0.14.0 sentry-sdk==1.5.11 setproctitle==1.2.2 From aca8724b3ee0e5738bd1bc34b36072ae522c72eb Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 4 Oct 2022 22:38:07 +0200 Subject: [PATCH 04/10] Chore: bump aleph-message to 0.2.2 (#334) --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index db50ad63a..891e6e543 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.1 aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7 aleph-client==0.4.6 - aleph-message==0.2.1 + aleph-message==0.2.2 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c coincurve==15.0.1 configmanager==1.35.1 From 791be3f19f7832938e229432c4dbb5a8c1e80cc0 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Thu, 6 Oct 2022 05:37:41 +0200 Subject: [PATCH 05/10] Cleanup: remove useless assignments in message WS (#336) Some variable assignments were useless. --- src/aleph/web/controllers/messages.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 0ba2468d5..6625a0ad2 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -215,12 +215,10 @@ async def messages_ws(request: web.Request): break except ConnectionResetError: - closing = True break except Exception: if ws.closed: - closing = True break LOGGER.exception("Error processing") From 6859a58e5e5df2a293688484e058fe49b064e243 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Thu, 6 Oct 2022 23:03:29 +0200 Subject: [PATCH 06/10] Fix: general 500 error on posts endpoint (#338) Problem: the "/api/v0/posts.json" endpoint returns a 500 error because of an invalid parameter being used in a function (an attempt to remove the MongoDB object ID from the endpoint). Solution: remove the object ID directly in the aggregate query. Added a simple test to verify that the endpoint is at least working a little bit. --- src/aleph/model/messages.py | 2 +- src/aleph/web/__init__.py | 4 +- src/aleph/web/controllers/posts.py | 5 +-- tests/api/__init__.py | 0 tests/api/conftest.py | 16 ++++++++ tests/api/fixtures/fixture_posts.json | 0 tests/api/test_messages.py | 23 +---------- tests/api/test_posts.py | 59 +++++++++++++++++++++++++++ tests/api/utils/__init__.py | 27 ++++++++++++ tests/conftest.py | 2 +- 10 files changed, 109 insertions(+), 29 deletions(-) create mode 100644 tests/api/__init__.py create mode 100644 tests/api/conftest.py create mode 100644 tests/api/fixtures/fixture_posts.json create mode 100644 tests/api/test_posts.py create mode 100644 tests/api/utils/__init__.py diff --git a/src/aleph/model/messages.py b/src/aleph/model/messages.py index 01f214fb3..71dbe6256 100644 --- a/src/aleph/model/messages.py +++ b/src/aleph/model/messages.py @@ -228,7 +228,7 @@ async def get_merged_posts(filters, sort=None, limit=100, skip=0, amend_limit=1) } } }, - {"$project": {"amends": 0}}, + {"$project": {"_id": 0, "amends": 0}}, {"$replaceRoot": {"newRoot": {"$mergeObjects": ["$$ROOT", "$content"]}}}, ] diff --git a/src/aleph/web/__init__.py b/src/aleph/web/__init__.py index c3d4288ae..b4f8082fa 100644 --- a/src/aleph/web/__init__.py +++ b/src/aleph/web/__init__.py @@ -38,8 +38,8 @@ def init_sio(app: web.Application) -> socketio.AsyncServer: return sio -def create_app() -> web.Application: - app = web.Application(client_max_size=1024 ** 2 * 64) +def create_app(debug: bool = False) -> web.Application: + app = web.Application(client_max_size=1024**2 * 64, debug=debug) tpl_path = pkg_resources.resource_filename("aleph.web", "templates") jinja_loader = jinja2.ChoiceLoader( diff --git a/src/aleph/web/controllers/posts.py b/src/aleph/web/controllers/posts.py index 126da1b31..916f036e4 100644 --- a/src/aleph/web/controllers/posts.py +++ b/src/aleph/web/controllers/posts.py @@ -85,10 +85,7 @@ async def view_posts_list(request): context = {"posts": posts} if pagination_per_page is not None: - total_msgs = await Message.collection.count_documents( - filter=find_filters, - projection={"_id": 0}, - ) + total_msgs = await Message.collection.count_documents(filter=find_filters) pagination = Pagination( pagination_page, diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/conftest.py b/tests/api/conftest.py new file mode 100644 index 000000000..7dd3c87cf --- /dev/null +++ b/tests/api/conftest.py @@ -0,0 +1,16 @@ +import json +from pathlib import Path +import pytest_asyncio +from aleph.model.messages import Message + + +@pytest_asyncio.fixture +async def fixture_messages(test_db): + fixtures_dir = Path(__file__).parent / "fixtures" + fixtures_file = fixtures_dir / "fixture_messages.json" + + with fixtures_file.open() as f: + messages = json.load(f) + + await Message.collection.insert_many(messages) + return messages diff --git a/tests/api/fixtures/fixture_posts.json b/tests/api/fixtures/fixture_posts.json new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/test_messages.py b/tests/api/test_messages.py index 54d4e527d..3fd9337c9 100644 --- a/tests/api/test_messages.py +++ b/tests/api/test_messages.py @@ -1,20 +1,13 @@ import itertools -import json -from pathlib import Path -from typing import Dict, Iterable, List +from typing import Dict, Iterable import pytest -import pytest_asyncio -from aleph.model.messages import Message +from .utils import get_messages_by_keys MESSAGES_URI = "/api/v0/messages.json" -def get_messages_by_keys(messages: Iterable[Dict], **keys) -> List[Dict]: - return [msg for msg in messages if all(msg[k] == v for k, v in keys.items())] - - def check_message_fields(messages: Iterable[Dict]): """ Basic checks on fields. For example, check that we do not expose internal data @@ -38,18 +31,6 @@ def assert_messages_equal(messages: Iterable[Dict], expected_messages: Iterable[ assert message["signature"] == expected_message["signature"] -@pytest_asyncio.fixture -async def fixture_messages(test_db): - fixtures_dir = Path(__file__).parent / "fixtures" - fixtures_file = fixtures_dir / "fixture_messages.json" - - with fixtures_file.open() as f: - messages = json.load(f) - - await Message.collection.insert_many(messages) - return messages - - @pytest.mark.asyncio async def test_get_messages(fixture_messages, ccn_api_client): response = await ccn_api_client.get(MESSAGES_URI) diff --git a/tests/api/test_posts.py b/tests/api/test_posts.py new file mode 100644 index 000000000..436f6d932 --- /dev/null +++ b/tests/api/test_posts.py @@ -0,0 +1,59 @@ +from typing import Dict, Iterable + +import aiohttp +import pytest +from aleph_message.models import MessageType + +from .utils import get_messages_by_keys + +POSTS_URI = "/api/v0/posts.json" + + +def assert_posts_equal(posts: Iterable[Dict], expected_messages: Iterable[Dict]): + posts_by_hash = {post["item_hash"]: post for post in posts} + + for expected_message in expected_messages: + post = posts_by_hash[expected_message["item_hash"]] + assert "_id" not in post + + assert post["chain"] == expected_message["chain"] + assert post["channel"] == expected_message["channel"] + assert post["sender"] == expected_message["sender"] + assert post["signature"] == expected_message["signature"] + + if expected_message.get("forgotten_by", []): + assert post["content"] is None + continue + + if "content" not in expected_message["content"]: + # TODO: there is a problem with the spec of posts: they can be specified + # without an internal "content" field, which does not break the + # endpoint but returns the content of message["content"] instead. + # We skip the issue for now. + continue + + assert post["content"] == expected_message["content"]["content"] + + +async def get_posts(api_client, **params) -> aiohttp.ClientResponse: + return await api_client.get(POSTS_URI, params=params) + + +async def get_posts_expect_success(api_client, **params): + response = await get_posts(api_client, **params) + assert response.status == 200, await response.text() + data = await response.json() + return data["posts"] + + +@pytest.mark.asyncio +async def test_get_posts(fixture_messages, ccn_api_client): + # The POST messages in the fixtures file do not amend one another, so we should have + # 1 POST = 1 message. + post_messages = get_messages_by_keys( + fixture_messages, + type=MessageType.post, + ) + posts = await get_posts_expect_success(ccn_api_client) + + assert_posts_equal(posts, post_messages) diff --git a/tests/api/utils/__init__.py b/tests/api/utils/__init__.py new file mode 100644 index 000000000..182366ba5 --- /dev/null +++ b/tests/api/utils/__init__.py @@ -0,0 +1,27 @@ +from typing import Dict, Iterable, List, Callable + + +def get_messages_by_predicate( + messages: Iterable[Dict], predicate: Callable[[Dict], bool] +) -> List[Dict]: + """ + Filters messages based on a user-provided predicate + (=a function/lambda operating on a single message). + """ + + return [msg for msg in messages if predicate(msg)] + + +def get_messages_by_keys(messages: Iterable[Dict], **keys) -> List[Dict]: + """ + Filters messages based on user-provided keys. + + Example: + >>> filtered_messages = get_messages_by_keys( + >>> message_list, item_hash="some-hash", channel="MY-CHANNEL" + >>> ) + + """ + return get_messages_by_predicate( + messages, lambda msg: all(msg[k] == v for k, v in keys.items()) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 5f73a7158..d4d92281f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,7 +62,7 @@ async def ccn_api_client(aiohttp_client, mock_config): event_loop = asyncio.get_event_loop() event_loop.set_debug(True) - app = create_app() + app = create_app(debug=True) app["config"] = mock_config client = await aiohttp_client(app) From 6e2ea65fbcd412dde983a4f7141ca429b727775b Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 7 Oct 2022 00:43:49 +0200 Subject: [PATCH 07/10] Internal: add unit tests for the aggregates endpoint (#340) Problem: the /api/v0/aggregates/{address}.json endpoint was not tested enough. Solution: add tests. --- tests/api/conftest.py | 22 +++- tests/api/fixtures/fixture_aggregates.json | 123 ++++++++++++++++++ tests/api/test_aggregates.py | 141 +++++++++++++++++++++ 3 files changed, 283 insertions(+), 3 deletions(-) create mode 100644 tests/api/fixtures/fixture_aggregates.json create mode 100644 tests/api/test_aggregates.py diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 7dd3c87cf..20e6ab15b 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -1,16 +1,32 @@ import json from pathlib import Path +from typing import Dict, List + import pytest_asyncio from aleph.model.messages import Message -@pytest_asyncio.fixture -async def fixture_messages(test_db): +async def _load_fixtures(filename: str): fixtures_dir = Path(__file__).parent / "fixtures" - fixtures_file = fixtures_dir / "fixture_messages.json" + fixtures_file = fixtures_dir / filename with fixtures_file.open() as f: messages = json.load(f) await Message.collection.insert_many(messages) return messages + + +@pytest_asyncio.fixture +async def fixture_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_messages.json") + + +@pytest_asyncio.fixture +async def fixture_aggregate_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_aggregates.json") + + +@pytest_asyncio.fixture +async def fixture_post_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_posts.json") diff --git a/tests/api/fixtures/fixture_aggregates.json b/tests/api/fixtures/fixture_aggregates.json new file mode 100644 index 000000000..c488e78e8 --- /dev/null +++ b/tests/api/fixtures/fixture_aggregates.json @@ -0,0 +1,123 @@ +[ + { + "chain": "ETH", + "item_hash": "53c2b16aa84b10878982a2920844625546f5db32337ecd9dd15928095a30381c", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857371.391834, + "key": "test_reference", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857371.391834,\"key\":\"test_reference\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0x7eee4cfc03b963ec51f04f60f6f7d58b0f24e0309d209feecb55af9e411ed1c01cfb547bb13539e91308b044c3661d93ddf272426542bc1a47722614cb0cd3621c", + "size": 128, + "time": 1644859283.101 + }, + { + "chain": "ETH", + "item_hash": "0022ed09d16a1c3d6cbb3c7e2645657ebaa0382eba65be06264b106f528b85bf", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857704.6253593, + "key": "test_reference", + "content": { + "c": 3, + "d": 4 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857704.6253593,\"key\":\"test_reference\",\"content\":{\"c\":3,\"d\":4}}", + "item_type": "inline", + "signature": "0xe6129196c36b066302692b53bcb78a9d8c996854b170238ebfe56924f0b6be604883c30a66d75250de489e1edb683c7da8ddb1ccb50a39d1bbbdad617e5c958f1b", + "size": 129, + "time": 1644859283.12 + }, + { + "chain": "ETH", + "item_hash": "a87004aa03f8ae63d2c4bbe84b93b9ce70ca6482ce36c82ab0b0f689fc273f34", + "sender": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", + "time": 1648215802.3821976, + "key": "test_reference", + "content": { + "c": 3, + "d": 4 + } + }, + "item_content": "{\"address\":\"0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4\",\"time\":1648215802.3821976,\"key\":\"test_reference\",\"content\":{\"c\":3,\"d\":4}}", + "item_type": "inline", + "signature": "0xc0f6ce2e4e9561b3949d51a97b8746125e1f031bbc13813cc74f1f61eea654fe300ad5e9ec098d41374bc0e43f83f2d66b834672abb811ae6a2dcdbd09f2565f1c", + "size": 129, + "time": 1648467547.771 + }, + { + "chain": "ETH", + "item_hash": "f875631a6c4a70ce44143bdd9a64861a5ce6f68e2267a00979ff0ad399a6c780", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "confirmations": [ + { + "chain": "ETH", + "height": 14205580, + "hash": "0x234b3cb25e893780c4cf50ec82c4ae9a61d61b766a367b559ae8192463e84a1b" + } + ], + "confirmed": true, + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857371.1748412, + "key": "test_target", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857371.1748412,\"key\":\"test_target\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0xaa28dafaecfd063bd30f65c877260bcdab37931fe7d8ef13173a952ae57a79e544d9fc9ae9131ba6ce6638bdbd62996467eb4a999416603ff2d1eaff372427bd1b", + "size": 126, + "time": 1644859283.1 + }, + { + "chain": "ETH", + "item_hash": "8c83e020b1f0661de3238ecaf2a41fd2f9dfe4a6c56453ccdf3ddd3fa4fae147", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "confirmations": [ + { + "chain": "ETH", + "height": 14205311, + "hash": "0x805dcf51856c813d5524f5b64555145fce6487b81dc605e6657fd208bebb2e05" + } + ], + "confirmed": true, + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644853185.0710306, + "key": "test_key", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644853185.0710306,\"key\":\"test_key\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0x4e3060c596de77b19f2791fbc34eff3d6c89c63d29250c960e9c1b752898d22d0f7d7759dc0b0d935b93e29534e6861d7a8deeb75cd69836acf0ad0e6e8626601b", + "size": 123, + "time": 1644855661.089 + } +] diff --git a/tests/api/test_aggregates.py b/tests/api/test_aggregates.py new file mode 100644 index 000000000..67cfd7966 --- /dev/null +++ b/tests/api/test_aggregates.py @@ -0,0 +1,141 @@ +import itertools +from typing import Dict, Iterable, List + +import aiohttp +import pytest + +AGGREGATES_URI = "/api/v0/aggregates/{address}.json" + +# Another address with three aggregates +ADDRESS_1 = "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98" +# Another address with one aggregate +ADDRESS_2 = "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4" + +EXPECTED_AGGREGATES = { + ADDRESS_1: { + "test_key": {"a": 1, "b": 2}, + "test_target": {"a": 1, "b": 2}, + "test_reference": {"a": 1, "b": 2, "c": 3, "d": 4}, + }, + ADDRESS_2: {"test_reference": {"c": 3, "d": 4}}, +} + + +def make_uri(address: str) -> str: + return AGGREGATES_URI.format(address=address) + + +def assert_aggregates_equal(expected: List[Dict], actual: Dict[str, Dict]): + for expected_aggregate in expected: + aggregate = actual[expected_aggregate["content"]["key"]] + assert "_id" not in aggregate + + assert aggregate == expected_aggregate["content"]["content"] + + +def merge_aggregates(messages: Iterable[Dict]) -> List[Dict]: + def merge_content(_messages: List[Dict]) -> Dict: + original = _messages[0] + for update in _messages[1:]: + original["content"]["content"].update(update["content"]["content"]) + return original + + aggregates = [] + + for key, group in itertools.groupby( + sorted(messages, key=lambda msg: msg["content"]["key"]), + lambda msg: msg["content"]["key"], + ): + sorted_messages = sorted(group, key=lambda msg: msg["time"]) + aggregates.append(merge_content(sorted_messages)) + + return aggregates + + +async def get_aggregates(api_client, address: str, **params) -> aiohttp.ClientResponse: + return await api_client.get(make_uri(address), params=params) + + +async def get_aggregates_expect_success(api_client, address: str, **params): + response = await get_aggregates(api_client, address, **params) + assert response.status == 200, await response.text() + return await response.json() + + +@pytest.fixture() +def fixture_aggregates(fixture_aggregate_messages): + return merge_aggregates(fixture_aggregate_messages) + + +@pytest.mark.asyncio +async def test_get_aggregates_no_update(ccn_api_client, fixture_aggregates): + """ + Tests receiving an aggregate from an address which posted one aggregate and never + updated it. + """ + + address = ADDRESS_2 + aggregates = await get_aggregates_expect_success(ccn_api_client, address) + + assert aggregates["address"] == address + assert aggregates["data"] == EXPECTED_AGGREGATES[address] + + +@pytest.mark.asyncio +async def test_get_aggregates(ccn_api_client, fixture_aggregates: List[Dict]): + """ + A more complex case with 3 aggregates, one of which was updated. + """ + + address = ADDRESS_1 + aggregates = await get_aggregates_expect_success(ccn_api_client, address) + + assert address == aggregates["address"] + assert aggregates["data"]["test_key"] == {"a": 1, "b": 2} + assert aggregates["data"]["test_target"] == {"a": 1, "b": 2} + assert aggregates["data"]["test_reference"] == {"a": 1, "b": 2, "c": 3, "d": 4} + + assert_aggregates_equal(fixture_aggregates, aggregates["data"]) + + +@pytest.mark.asyncio +async def test_get_aggregates_filter_by_key(ccn_api_client, fixture_aggregates: List[Dict]): + """ + Tests the 'keys' query parameter. + """ + + address, key = ADDRESS_1, "test_target" + aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=key) + assert aggregates["address"] == address + assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key] + + # Multiple keys + address, keys = ADDRESS_1, ["test_target", "test_reference"] + aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=",".join(keys)) + assert aggregates["address"] == address + for key in keys: + assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key], f"Key {key} does not match" + + +@pytest.mark.asyncio +async def test_get_aggregates_limit(ccn_api_client, fixture_aggregates: List[Dict]): + """ + Tests the 'limit' query parameter. + """ + + address, key = ADDRESS_1, "test_reference" + aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=key, limit=1) + assert aggregates["address"] == address + assert aggregates["data"][key] == {"c": 3, "d": 4} + + +@pytest.mark.asyncio +async def test_get_aggregates_invalid_address(ccn_api_client, fixture_aggregates: List[Dict]): + """ + Pass an unknown address. + """ + + invalid_address = "unknown" + + response = await get_aggregates(ccn_api_client, invalid_address) + assert response.status == 404 From 8c51cf549514518c4f2fd2b79db291ba19f21d74 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 7 Oct 2022 00:50:09 +0200 Subject: [PATCH 08/10] Internal: improve validation of the aggregates endpoint (#341) Use a Pydantic model instead of manual validation. --- src/aleph/web/controllers/aggregates.py | 35 +++++++++++++++---- src/aleph/web/controllers/utils.py | 1 + tests/api/test_aggregates.py | 46 ++++++++++++++++++++----- 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/src/aleph/web/controllers/aggregates.py b/src/aleph/web/controllers/aggregates.py index 6f83a59d1..efc90ed31 100644 --- a/src/aleph/web/controllers/aggregates.py +++ b/src/aleph/web/controllers/aggregates.py @@ -1,6 +1,27 @@ +from typing import List, Optional + from aiohttp import web +from pydantic import BaseModel, validator, ValidationError from aleph.model.messages import get_computed_address_aggregates +from .utils import LIST_FIELD_SEPARATOR + + +DEFAULT_LIMIT = 1000 + + +class AggregatesQueryParams(BaseModel): + keys: Optional[List[str]] = None + limit: int = DEFAULT_LIMIT + + @validator( + "keys", + pre=True, + ) + def split_str(cls, v): + if isinstance(v, str): + return v.split(LIST_FIELD_SEPARATOR) + return v async def address_aggregate(request): @@ -10,15 +31,15 @@ async def address_aggregate(request): address = request.match_info["address"] - keys = request.query.get("keys", None) - if keys is not None: - keys = keys.split(",") - - limit = request.query.get("limit", "1000") - limit = int(limit) + try: + query_params = AggregatesQueryParams.parse_obj(request.query) + except ValidationError as e: + raise web.HTTPUnprocessableEntity( + text=e.json(), content_type="application/json" + ) aggregates = await get_computed_address_aggregates( - address_list=[address], key_list=keys, limit=limit + address_list=[address], key_list=query_params.keys, limit=query_params.limit ) if not aggregates.get(address): diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index 13f18fd95..b227013e4 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -7,6 +7,7 @@ PER_PAGE = 20 PER_PAGE_SUMMARY = 50 +LIST_FIELD_SEPARATOR = "," class Pagination(object): diff --git a/tests/api/test_aggregates.py b/tests/api/test_aggregates.py index 67cfd7966..6dc23502d 100644 --- a/tests/api/test_aggregates.py +++ b/tests/api/test_aggregates.py @@ -43,8 +43,8 @@ def merge_content(_messages: List[Dict]) -> Dict: aggregates = [] for key, group in itertools.groupby( - sorted(messages, key=lambda msg: msg["content"]["key"]), - lambda msg: msg["content"]["key"], + sorted(messages, key=lambda msg: msg["content"]["key"]), + lambda msg: msg["content"]["key"], ): sorted_messages = sorted(group, key=lambda msg: msg["time"]) aggregates.append(merge_content(sorted_messages)) @@ -99,22 +99,30 @@ async def test_get_aggregates(ccn_api_client, fixture_aggregates: List[Dict]): @pytest.mark.asyncio -async def test_get_aggregates_filter_by_key(ccn_api_client, fixture_aggregates: List[Dict]): +async def test_get_aggregates_filter_by_key( + ccn_api_client, fixture_aggregates: List[Dict] +): """ Tests the 'keys' query parameter. """ address, key = ADDRESS_1, "test_target" - aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=key) + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=key + ) assert aggregates["address"] == address assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key] # Multiple keys address, keys = ADDRESS_1, ["test_target", "test_reference"] - aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=",".join(keys)) + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=",".join(keys) + ) assert aggregates["address"] == address for key in keys: - assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key], f"Key {key} does not match" + assert ( + aggregates["data"][key] == EXPECTED_AGGREGATES[address][key] + ), f"Key {key} does not match" @pytest.mark.asyncio @@ -124,13 +132,17 @@ async def test_get_aggregates_limit(ccn_api_client, fixture_aggregates: List[Dic """ address, key = ADDRESS_1, "test_reference" - aggregates = await get_aggregates_expect_success(ccn_api_client, address=address, keys=key, limit=1) + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=key, limit=1 + ) assert aggregates["address"] == address assert aggregates["data"][key] == {"c": 3, "d": 4} @pytest.mark.asyncio -async def test_get_aggregates_invalid_address(ccn_api_client, fixture_aggregates: List[Dict]): +async def test_get_aggregates_invalid_address( + ccn_api_client, fixture_aggregates: List[Dict] +): """ Pass an unknown address. """ @@ -139,3 +151,21 @@ async def test_get_aggregates_invalid_address(ccn_api_client, fixture_aggregates response = await get_aggregates(ccn_api_client, invalid_address) assert response.status == 404 + + +@pytest.mark.asyncio +async def test_get_aggregates_invalid_params( + ccn_api_client, fixture_aggregates: List[Dict] +): + """ + Tests that passing invalid parameters returns a 422 error. + """ + + # A string as limit + response = await get_aggregates(ccn_api_client, ADDRESS_1, limit="abc") + assert response.status == 422 + assert response.content_type == "application/json" + + errors = await response.json() + assert len(errors) == 1 + assert errors[0]["loc"] == ["limit"], errors From f4446ff0b78fbb82b27bfbe691b2227204899235 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Thu, 13 Oct 2022 14:02:16 +0200 Subject: [PATCH 09/10] Internal: store the full TX context in confirmations (#265) We now store the entire context of the transaction in the confirmations array of messages. This means that two additional fields are now preserved: the transaction block timestamp and the publisher. As we need to re-fetch this data from chain data, a new migration script resets the chain height to re-process all transactions. We reset the confirmation status of all messages to unconfirmed and deleted their confirmations array to let the node automatically migrate to the new format. Renamed some fields of the `TxContext` class in order to use the same format in all DB collections and to avoid a breaking change in the messages confirmation format. --- .../0003-retrieve-confirmation-time.py | 48 ++++++++++ src/aleph/chains/common.py | 90 ++++++++----------- src/aleph/chains/ethereum.py | 4 +- src/aleph/chains/nuls2.py | 4 +- src/aleph/chains/tx_context.py | 14 ++- src/aleph/jobs/process_pending_messages.py | 12 +-- src/aleph/jobs/process_pending_txs.py | 13 ++- src/aleph/schemas/message_confirmation.py | 16 ++++ src/aleph/schemas/validated_message.py | 5 +- tests/chains/test_confirmation.py | 32 ++++--- .../test_process_pending_txs.py | 4 +- tests/schemas/test_validated_messages.py | 5 +- tests/storage/test_store_message.py | 4 +- 13 files changed, 152 insertions(+), 99 deletions(-) create mode 100644 deployment/migrations/scripts/0003-retrieve-confirmation-time.py create mode 100644 src/aleph/schemas/message_confirmation.py diff --git a/deployment/migrations/scripts/0003-retrieve-confirmation-time.py b/deployment/migrations/scripts/0003-retrieve-confirmation-time.py new file mode 100644 index 000000000..abaebcd05 --- /dev/null +++ b/deployment/migrations/scripts/0003-retrieve-confirmation-time.py @@ -0,0 +1,48 @@ +""" +This migration retrieves additional metadata regarding chain confirmation of messages, +including the block timestamp. We reset the TX height of the node to reprocess +all the chain data messages and insert additional values +""" + + +import logging +import os +from configmanager import Config +from aleph.model.chains import Chain +from aleph.model.pending import PendingMessage, PendingTX +from aleph.model.messages import Message + +logger = logging.getLogger(os.path.basename(__file__)) + + +async def upgrade(config: Config, **kwargs): + logger.info("Resetting chain height to re-fetch all chaindata...") + start_height = config.ethereum.start_height.value + await Chain.set_last_height("ETH", start_height) + + logger.info("Dropping all pending transactions...") + await PendingTX.collection.delete_many({}) + + logger.info( + "Dropping all pending confirmation messages " + "(they will be reinserted automatically)..." + ) + await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}}) + + logger.info("Removing confirmation data for all messages...") + # Confirmations will be automatically added again by the pending TX processor. + # By removing the confirmation entirely, we make sure to avoid intermediate states + # if a message was confirmed in an unexpected way. + await Message.collection.update_many( + {"confirmed": True}, + { + "$set": { + "confirmed": False, + }, + "$unset": {"confirmations": 1}, + }, + ) + + +async def downgrade(**kwargs): + raise NotImplementedError("Downgrading this migration is not supported.") diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 4c74a7400..3f5bd0093 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -1,11 +1,9 @@ import asyncio import json import logging -from dataclasses import asdict from enum import IntEnum from typing import Dict, Optional, Tuple, List -from aleph_message.models import MessageConfirmation from bson import ObjectId from pymongo import UpdateOne @@ -25,18 +23,17 @@ from aleph.model.pending import PendingMessage, PendingTX from aleph.network import verify_signature from aleph.permissions import check_sender_authorization -from aleph.storage import get_json, pin_hash, add_json, get_message_content -from .tx_context import TxContext -from aleph.schemas.pending_messages import ( - BasePendingMessage, -) +from aleph.schemas.pending_messages import BasePendingMessage from aleph.schemas.validated_message import ( validate_pending_message, ValidatedStoreMessage, ValidatedForgetMessage, make_confirmation_update_query, -make_message_upsert_query, + make_message_upsert_query, ) +from ..schemas.message_confirmation import MessageConfirmation +from aleph.storage import get_json, pin_hash, add_json, get_message_content +from .tx_context import TxContext LOGGER = logging.getLogger("chains.common") @@ -64,21 +61,17 @@ async def mark_confirmed_data(chain_name, tx_hash, height): async def delayed_incoming( message: BasePendingMessage, - chain_name: Optional[str] = None, - tx_hash: Optional[str] = None, - height: Optional[int] = None, + tx_context: Optional[TxContext] = None, + check_message: bool = True, ): if message is None: return + await PendingMessage.collection.insert_one( { "message": message.dict(exclude={"content"}), - "source": dict( - chain_name=chain_name, - tx_hash=tx_hash, - height=height, - check_message=True, # should we store this? - ), + "tx_context": tx_context.dict() if tx_context else None, + "check_message": check_message, } ) @@ -91,9 +84,7 @@ class IncomingStatus(IntEnum): async def mark_message_for_retry( message: BasePendingMessage, - chain_name: Optional[str], - tx_hash: Optional[str], - height: Optional[int], + tx_context: Optional[TxContext], check_message: bool, retrying: bool, existing_id, @@ -101,17 +92,7 @@ async def mark_message_for_retry( message_dict = message.dict(exclude={"content"}) if not retrying: - await PendingMessage.collection.insert_one( - { - "message": message_dict, - "source": dict( - chain_name=chain_name, - tx_hash=tx_hash, - height=height, - check_message=check_message, # should we store this? - ), - } - ) + await delayed_incoming(message, tx_context, check_message) else: LOGGER.debug(f"Incrementing for {existing_id}") result = await PendingMessage.collection.update_one( @@ -122,9 +103,7 @@ async def mark_message_for_retry( async def incoming( pending_message: BasePendingMessage, - chain_name: Optional[str] = None, - tx_hash: Optional[str] = None, - height: Optional[int] = None, + tx_context: Optional[TxContext] = None, seen_ids: Optional[Dict[Tuple, int]] = None, check_message: bool = False, retrying: bool = False, @@ -139,16 +118,23 @@ async def incoming( item_hash = pending_message.item_hash sender = pending_message.sender confirmations = [] + chain_name = tx_context.chain if tx_context is not None else None ids_key = (item_hash, sender, chain_name) - if chain_name and tx_hash and height: + if tx_context: if seen_ids is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] confirmations.append( - MessageConfirmation(chain=chain_name, hash=tx_hash, height=height) + MessageConfirmation( + chain=tx_context.chain, + hash=tx_context.hash, + height=tx_context.height, + time=tx_context.time, + publisher=tx_context.publisher, + ) ) filters = { @@ -178,14 +164,14 @@ async def incoming( updates: Dict[str, Dict] = {} if existing: - if seen_ids is not None and height is not None: + if seen_ids is not None and tx_context is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height LOGGER.debug("Updating %s." % item_hash) @@ -205,9 +191,7 @@ async def incoming( LOGGER.exception("Can't get content of object %r" % item_hash) await mark_message_for_retry( message=pending_message, - chain_name=chain_name, - tx_hash=tx_hash, - height=height, + tx_context=tx_context, check_message=check_message, retrying=retrying, existing_id=existing_id, @@ -215,7 +199,9 @@ async def incoming( return IncomingStatus.RETRYING_LATER, [] validated_message = validate_pending_message( - pending_message=pending_message, content=content, confirmations=confirmations + pending_message=pending_message, + content=content, + confirmations=confirmations, ) # warning: those handlers can modify message and content in place @@ -244,9 +230,7 @@ async def incoming( LOGGER.debug("Message type handler has failed, retrying later.") await mark_message_for_retry( message=pending_message, - chain_name=chain_name, - tx_hash=tx_hash, - height=height, + tx_context=tx_context, check_message=check_message, retrying=retrying, existing_id=existing_id, @@ -264,14 +248,14 @@ async def incoming( LOGGER.warning("Invalid sender for %s" % item_hash) return IncomingStatus.MESSAGE_HANDLED, [] - if seen_ids is not None and height is not None: + if seen_ids is not None and tx_context is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height LOGGER.debug("New message to store for %s." % item_hash) @@ -386,5 +370,5 @@ async def incoming_chaindata(content: Dict, context: TxContext): For now we only add it to the database, it will be processed later. """ await PendingTX.collection.insert_one( - {"content": content, "context": asdict(context)} + {"content": content, "context": context.dict()} ) diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 1cbbdde53..cce6635ab 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -197,8 +197,8 @@ async def request_transactions( try: jdata = json.loads(message) context = TxContext( - chain_name=CHAIN_NAME, - tx_hash=event_data.transactionHash.hex(), + chain=CHAIN_NAME, + hash=event_data.transactionHash.hex(), time=timestamp, height=event_data.blockNumber, publisher=publisher, diff --git a/src/aleph/chains/nuls2.py b/src/aleph/chains/nuls2.py index d8afad46d..39f31269d 100644 --- a/src/aleph/chains/nuls2.py +++ b/src/aleph/chains/nuls2.py @@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T jdata = json.loads(ddata) context = TxContext( - chain_name=CHAIN_NAME, - tx_hash=tx["hash"], + chain=CHAIN_NAME, + hash=tx["hash"], height=tx["height"], time=tx["createTime"], publisher=tx["coinFroms"][0]["address"], diff --git a/src/aleph/chains/tx_context.py b/src/aleph/chains/tx_context.py index 81839d3a6..a627af8b9 100644 --- a/src/aleph/chains/tx_context.py +++ b/src/aleph/chains/tx_context.py @@ -1,11 +1,7 @@ -from dataclasses import dataclass +from aleph.schemas.message_confirmation import MessageConfirmation -@dataclass -class TxContext: - chain_name: str - tx_hash: str - height: int - # Transaction timestamp, in Unix time (number of seconds since epoch). - time: int - publisher: str +# At the moment, confirmation = chain transaction. This might change, but in the meantime +# having TxContext inherit MessageConfirmation avoids code duplication. +class TxContext(MessageConfirmation): + pass diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index dd35e445d..0192ec778 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -14,13 +14,14 @@ from setproctitle import setproctitle from aleph.chains.common import incoming, IncomingStatus +from aleph.exceptions import InvalidMessageError from aleph.logging import setup_logging from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.pending import PendingMessage +from aleph.schemas.pending_messages import parse_message from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results -from ..exceptions import InvalidMessageError -from ..schemas.pending_messages import parse_message +from ..chains.tx_context import TxContext LOGGER = getLogger("jobs.pending_messages") @@ -60,12 +61,13 @@ async def handle_pending_message( # If an invalid message somehow ended in pending messages, drop it. return [delete_pending_message_op] + tx_context_dict = pending.get("tx_context") + tx_context = TxContext.parse_obj(tx_context_dict) if tx_context_dict else None + async with sem: status, operations = await incoming( pending_message=message, - chain_name=pending["source"].get("chain_name"), - tx_hash=pending["source"].get("tx_hash"), - height=pending["source"].get("height"), + tx_context=tx_context, seen_ids=seen_ids, check_message=pending["source"].get("check_message", True), retrying=True, diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 588e8baa7..2658b8a2e 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -4,7 +4,8 @@ import asyncio import logging -from typing import List, Dict, Optional, Set +from typing import List, Dict, Optional +from typing import Set import sentry_sdk from configmanager import Config @@ -31,7 +32,7 @@ async def handle_pending_tx( db_operations: List[DbBulkOperation] = [] tx_context = TxContext(**pending_tx["context"]) - LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height) + LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height) messages = await get_chaindata_messages( pending_tx["content"], tx_context, seen_ids=seen_ids @@ -55,12 +56,8 @@ async def handle_pending_tx( operation=InsertOne( { "message": message.dict(exclude={"content"}), - "source": dict( - chain_name=tx_context.chain_name, - tx_hash=tx_context.tx_hash, - height=tx_context.height, - check_message=True, # should we store this? - ), + "tx_context": tx_context.dict(), + "check_message": True, } ), ) diff --git a/src/aleph/schemas/message_confirmation.py b/src/aleph/schemas/message_confirmation.py new file mode 100644 index 000000000..2613b25de --- /dev/null +++ b/src/aleph/schemas/message_confirmation.py @@ -0,0 +1,16 @@ +from aleph_message.models import Chain +from pydantic import BaseModel, Field + + +class MessageConfirmation(BaseModel): + chain: Chain = Field(..., description="Chain from which the confirmation was fetched.") + height: int = Field(..., description="Block in which the confirmation was published.") + hash: str = Field( + ..., + description="Hash of the transaction/block in which the confirmation was published.", + ) + time: float = Field( + ..., + description="Transaction timestamp, in Unix time (number of seconds since epoch).", + ) + publisher: str = Field(..., description="Publisher of the confirmation on chain.") diff --git a/src/aleph/schemas/validated_message.py b/src/aleph/schemas/validated_message.py index e112252af..b863230f9 100644 --- a/src/aleph/schemas/validated_message.py +++ b/src/aleph/schemas/validated_message.py @@ -7,14 +7,12 @@ from typing import List, Literal, Optional, Generic, Dict, Type, Any from aleph_message.models import ( - MessageConfirmation, AggregateContent, ForgetContent, MessageType, PostContent, ProgramContent, - StoreContent, -) + StoreContent, ) from pydantic import BaseModel, Field from aleph.schemas.base_messages import AlephBaseMessage, ContentType, MType @@ -26,6 +24,7 @@ PendingProgramMessage, PendingStoreMessage, ) +from .message_confirmation import MessageConfirmation from .message_content import MessageContent diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index 5143150f8..7964fa3a2 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -4,6 +4,7 @@ import pytest from aleph.chains.common import process_one_message +from aleph.chains.tx_context import TxContext from aleph.model.messages import CappedMessage, Message from aleph.schemas.pending_messages import parse_message @@ -56,18 +57,22 @@ async def test_confirm_message(test_db): assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db) # Now, confirm the message - chain_name, tx_hash, height = "ETH", "123", 8000 - await process_one_message( - message, chain_name=chain_name, tx_hash=tx_hash, height=height + tx_context = TxContext( + chain="ETH", + hash="123", + height=8000, + time=120000, + publisher="0xdeadbeef", ) + await process_one_message(message, tx_context) + message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] - assert {"chain": chain_name, "hash": tx_hash, "height": height} in message_in_db[ - "confirmations" - ] + expected_confirmations = [tx_context.dict()] + assert message_in_db["confirmations"] == expected_confirmations capped_message_after_confirmation = await CappedMessage.collection.find_one( {"item_hash": item_hash} @@ -89,18 +94,23 @@ async def test_process_confirmed_message(test_db): item_hash = MESSAGE_DICT["item_hash"] # Confirm the message - chain_name, tx_hash, height = "ETH", "123", 8000 message = parse_message(MESSAGE_DICT) - await process_one_message( - message, chain_name=chain_name, tx_hash=tx_hash, height=height + tx_context = TxContext( + chain="ETH", + hash="123", + height=8000, + time=120000, + publisher="0xdeadbeef", ) + await process_one_message(message, tx_context) + # Now, confirm the message message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] - expected_confirmations = [{"chain": chain_name, "hash": tx_hash, "height": height}] + expected_confirmations = [tx_context.dict()] assert message_in_db["confirmations"] == expected_confirmations capped_message_in_db = await CappedMessage.collection.find_one( @@ -109,4 +119,4 @@ async def test_process_confirmed_message(test_db): assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db) assert capped_message_in_db["confirmed"] - assert capped_message_in_db["confirmations"] == expected_confirmations + assert capped_message_in_db["confirmations"] == [tx_context.dict()] diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index 76a0f61c4..2b3a9814c 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -33,8 +33,8 @@ async def test_process_pending_tx(mocker, test_db): "content": "test-data-pending-tx-messages", }, "context": { - "chain_name": "ETH", - "tx_hash": "0xf49cb176c1ce4f6eb7b9721303994b05074f8fadc37b5f41ac6f78bdf4b14b6c", + "chain": "ETH", + "hash": "0xf49cb176c1ce4f6eb7b9721303994b05074f8fadc37b5f41ac6f78bdf4b14b6c", "time": 1632835747, "height": 13314512, "publisher": "0x23eC28598DCeB2f7082Cc3a9D670592DfEd6e0dC", diff --git a/tests/schemas/test_validated_messages.py b/tests/schemas/test_validated_messages.py index dda32fa73..c2cc9f39f 100644 --- a/tests/schemas/test_validated_messages.py +++ b/tests/schemas/test_validated_messages.py @@ -6,8 +6,6 @@ import json from typing import Dict -from aleph_message.models import MessageConfirmation - from aleph.schemas.message_content import MessageContent, ContentSource from aleph.schemas.pending_messages import ( PendingAggregateMessage, @@ -21,6 +19,7 @@ ValidatedAggregateMessage, ValidatedStoreMessage, ) +from aleph.schemas.message_confirmation import MessageConfirmation def check_basic_message_fields(message: BaseValidatedMessage, message_dict: Dict): @@ -132,7 +131,7 @@ def test_parse_store_message_inline_content(): pending_message = parse_message(message_dict) assert isinstance(pending_message, PendingStoreMessage) - confirmations = [MessageConfirmation(chain="ETH", height=1234, hash="abcd")] + confirmations = [MessageConfirmation(chain="ETH", height=1234, hash="abcd", time=8000, publisher="0xsomething")] message_content = MessageContent( pending_message.item_hash, ContentSource.INLINE, content, item_content ) diff --git a/tests/storage/test_store_message.py b/tests/storage/test_store_message.py index 458458cac..b6ed749e5 100644 --- a/tests/storage/test_store_message.py +++ b/tests/storage/test_store_message.py @@ -1,7 +1,6 @@ import json import pytest -from aleph_message.models import MessageConfirmation from aleph.handlers.storage import handle_new_storage from aleph.schemas.message_content import ContentSource, RawContent @@ -9,6 +8,7 @@ ValidatedStoreMessage, StoreContentWithMetadata, ) +from aleph.schemas.message_confirmation import MessageConfirmation from message_test_helpers import make_validated_message_from_dict @@ -33,6 +33,8 @@ def fixture_message_file(): chain="ETH", hash="0x28fd852984b1f2222ca1870a97f44cc34b535a49d2618f5689a10a67985935d5", height=14276536, + time=9000, + publisher="0xsomething", ) ], ) From d70793fcad031aae026113990b4434da88211c7f Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Sat, 14 May 2022 20:12:42 +0200 Subject: [PATCH 10/10] [Messages] Reception and confirmation times Added two new fields to the messages collection. We now store the initial time of reception of a message in the `reception_time` field, and the earliest on-chain confirmation time in the `confirmation_time` field. These fields will be used to order messages more precisely/securely and to compute metrics, for example the propagation time of a message on the network. The `time` field of messages is now deprecated, as it is defined by users and not signed. If you need the user time, use `content.time`. --- .../0004-create-new-message-time-fields.py | 32 +++++++++++++++++++ src/aleph/chains/common.py | 16 ++++++++-- src/aleph/jobs/process_pending_messages.py | 1 + src/aleph/jobs/process_pending_txs.py | 7 ++++ src/aleph/schemas/validated_message.py | 30 +++++++++++++++-- src/aleph/services/ipfs/pubsub.py | 8 +++-- src/aleph/services/p2p/protocol.py | 4 ++- tests/chains/test_common.py | 3 +- tests/chains/test_confirmation.py | 17 ++++++++-- tests/helpers/message_test_helpers.py | 4 +++ tests/schemas/test_validated_messages.py | 3 ++ .../storage/forget/test_forget_multi_users.py | 6 ++-- tests/test_network.py | 2 +- 13 files changed, 117 insertions(+), 16 deletions(-) create mode 100644 deployment/migrations/scripts/0004-create-new-message-time-fields.py diff --git a/deployment/migrations/scripts/0004-create-new-message-time-fields.py b/deployment/migrations/scripts/0004-create-new-message-time-fields.py new file mode 100644 index 000000000..80ebc533d --- /dev/null +++ b/deployment/migrations/scripts/0004-create-new-message-time-fields.py @@ -0,0 +1,32 @@ +""" +This migration adds the `confirmation_time` and `reception_time` fields. +`confirmation_time` serves as a cache of the first confirmation message seen +in on-chain data. +`reception_time` represents the first time the node became aware of +the message, confirmed or not. +""" + + +import logging +import os + +from configmanager import Config + +from aleph.model.messages import Message + +logger = logging.getLogger(os.path.basename(__file__)) + + +async def upgrade(config: Config, **kwargs): + logger.info("Creating confirmation_time field for messages...") + await Message.collection.update_many( + {"confirmed": True}, + [{"$set": {"confirmation_time": {"$min": "$confirmations.time"}}}], + ) + + +async def downgrade(**kwargs): + logger.info("Creating confirmation_time field for messages...") + await Message.collection.update_many( + {"$unset": {"confirmation_time": 1, "reception_time": 1}} + ) diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 3f5bd0093..326f645cf 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -2,7 +2,7 @@ import json import logging from enum import IntEnum -from typing import Dict, Optional, Tuple, List +from typing import Any, Dict, Optional, Tuple, List from bson import ObjectId from pymongo import UpdateOne @@ -61,6 +61,7 @@ async def mark_confirmed_data(chain_name, tx_hash, height): async def delayed_incoming( message: BasePendingMessage, + reception_time: float, tx_context: Optional[TxContext] = None, check_message: bool = True, ): @@ -71,6 +72,7 @@ async def delayed_incoming( { "message": message.dict(exclude={"content"}), "tx_context": tx_context.dict() if tx_context else None, + "reception_time": reception_time, "check_message": check_message, } ) @@ -84,6 +86,7 @@ class IncomingStatus(IntEnum): async def mark_message_for_retry( message: BasePendingMessage, + reception_time: float, tx_context: Optional[TxContext], check_message: bool, retrying: bool, @@ -92,7 +95,12 @@ async def mark_message_for_retry( message_dict = message.dict(exclude={"content"}) if not retrying: - await delayed_incoming(message, tx_context, check_message) + await delayed_incoming( + message, + reception_time=reception_time, + tx_context=tx_context, + check_message=check_message, + ) else: LOGGER.debug(f"Incrementing for {existing_id}") result = await PendingMessage.collection.update_one( @@ -103,6 +111,7 @@ async def mark_message_for_retry( async def incoming( pending_message: BasePendingMessage, + reception_time: float, tx_context: Optional[TxContext] = None, seen_ids: Optional[Dict[Tuple, int]] = None, check_message: bool = False, @@ -191,6 +200,7 @@ async def incoming( LOGGER.exception("Can't get content of object %r" % item_hash) await mark_message_for_retry( message=pending_message, + reception_time=reception_time, tx_context=tx_context, check_message=check_message, retrying=retrying, @@ -201,6 +211,7 @@ async def incoming( validated_message = validate_pending_message( pending_message=pending_message, content=content, + reception_time=reception_time, confirmations=confirmations, ) @@ -230,6 +241,7 @@ async def incoming( LOGGER.debug("Message type handler has failed, retrying later.") await mark_message_for_retry( message=pending_message, + reception_time=reception_time, tx_context=tx_context, check_message=check_message, retrying=retrying, diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 0192ec778..69e1fdfab 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -67,6 +67,7 @@ async def handle_pending_message( async with sem: status, operations = await incoming( pending_message=message, + reception_time=pending["reception_time"], tx_context=tx_context, seen_ids=seen_ids, check_message=pending["source"].get("check_message", True), diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 2658b8a2e..488dcf887 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -4,6 +4,7 @@ import asyncio import logging +import time from typing import List, Dict, Optional from typing import Set @@ -39,6 +40,11 @@ async def handle_pending_tx( ) if messages: for i, message_dict in enumerate(messages): + reception_time = time.time() + # TODO: this update of the time field is unwanted, but needed to preserve + # the behavior of aggregates. Use the correct time field in aggregates + # and then remove this line. + message_dict["time"] = tx_context.time + (i / 1000) try: # we don't check signatures yet. @@ -57,6 +63,7 @@ async def handle_pending_tx( { "message": message.dict(exclude={"content"}), "tx_context": tx_context.dict(), + "reception_time": reception_time, "check_message": True, } ), diff --git a/src/aleph/schemas/validated_message.py b/src/aleph/schemas/validated_message.py index b863230f9..350401851 100644 --- a/src/aleph/schemas/validated_message.py +++ b/src/aleph/schemas/validated_message.py @@ -12,7 +12,8 @@ MessageType, PostContent, ProgramContent, - StoreContent, ) + StoreContent, +) from pydantic import BaseModel, Field from aleph.schemas.base_messages import AlephBaseMessage, ContentType, MType @@ -60,6 +61,8 @@ class BaseValidatedMessage(AlephBaseMessage, Generic[MType, ContentType]): content: ContentType confirmations: List[MessageConfirmation] = Field(default_factory=list) forgotten_by: List[str] = Field(default_factory=list) + reception_time: float + confirmation_time: Optional[float] class ValidatedAggregateMessage( @@ -95,6 +98,7 @@ class ValidatedStoreMessage( def validate_pending_message( pending_message: BasePendingMessage[MType, ContentType], content: MessageContent, + reception_time: float, confirmations: List[MessageConfirmation], ) -> BaseValidatedMessage[MType, ContentType]: @@ -114,6 +118,11 @@ def validate_pending_message( if json_content.get("time", None) is None: json_content["time"] = pending_message.time + if confirmations: + confirmation_time = min(confirmation.time for confirmation in confirmations) + else: + confirmation_time = None + # Note: we could use the construct method of Pydantic to bypass validation # and speed up the conversion process. However, this means giving up on validation. # At the time of writing, correctness seems more important than performance. @@ -123,6 +132,8 @@ def validate_pending_message( confirmed=bool(confirmations), confirmations=confirmations, size=len(content.raw_value), + reception_time=reception_time, + confirmation_time=confirmation_time, ) @@ -137,6 +148,11 @@ def make_confirmation_update_query(confirmations: List[MessageConfirmation]) -> return { "$max": {"confirmed": True}, + "$min": { + "confirmation_time": min( + confirmation.time for confirmation in confirmations + ) + }, "$addToSet": { "confirmations": { "$each": [confirmation.dict() for confirmation in confirmations] @@ -159,10 +175,18 @@ def make_message_upsert_query(message: BaseValidatedMessage[Any, Any]) -> Dict: "channel": message.channel, "signature": message.signature, }, - "$min": {"time": message.time}, + "$min": { + "time": message.time, + "reception_time": message.reception_time, + }, } # Add fields related to confirmations - updates.update(make_confirmation_update_query(message.confirmations)) + confirmation_updates = make_confirmation_update_query(message.confirmations) + for k, v in confirmation_updates.items(): + try: + updates[k].update(v) + except KeyError: + updates[k] = v return updates diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 46c8edb1f..79dfe0622 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -1,12 +1,13 @@ import asyncio import base64 import logging +import time from typing import Union import base58 -from ...exceptions import InvalidMessageError from .common import get_ipfs_api +from aleph.exceptions import InvalidMessageError LOGGER = logging.getLogger("IPFS.PUBSUB") @@ -48,9 +49,12 @@ async def incoming_channel(topic) -> None: try: async for mvalue in sub(topic): try: + reception_time = time.time() message = await get_pubsub_message(mvalue) LOGGER.debug("New message %r" % message) - asyncio.create_task(process_one_message(message)) + asyncio.create_task( + process_one_message(message, reception_time=reception_time) + ) except InvalidMessageError: LOGGER.warning(f"Invalid message {mvalue}") except Exception: diff --git a/src/aleph/services/p2p/protocol.py b/src/aleph/services/p2p/protocol.py index de632fbbe..19bbbb8b3 100644 --- a/src/aleph/services/p2p/protocol.py +++ b/src/aleph/services/p2p/protocol.py @@ -3,6 +3,7 @@ import json import logging import random +import time from typing import Any, Dict, Optional, Set from anyio.abc import SocketStream @@ -186,6 +187,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: try: async for pubsub_message in receive_pubsub_messages(stream): try: + reception_time = time.time() msg_dict = pubsub_msg_to_dict(pubsub_message) LOGGER.debug("Received from P2P:", msg_dict) # we should check the sender here to avoid spam @@ -196,7 +198,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: continue LOGGER.debug("New message %r" % message) - await delayed_incoming(message) + await delayed_incoming(message, reception_time) except Exception: LOGGER.exception("Can't handle message") diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index 1d12a06c2..dca327e41 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -68,5 +68,6 @@ async def async_magic(): message_dict["item_type"] = "inline" message = parse_message(message_dict) - status, ops = await incoming(message, check_message=True) + status, ops = await incoming(message, reception_time=100000, check_message=True) + assert status == IncomingStatus.MESSAGE_HANDLED diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index 7964fa3a2..cfbadc635 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -43,12 +43,14 @@ async def test_confirm_message(test_db): content = json.loads(MESSAGE_DICT["item_content"]) message = parse_message(MESSAGE_DICT) - await process_one_message(message) + original_reception_time = 100000 + await process_one_message(message, reception_time=original_reception_time) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["content"] == content assert not message_in_db["confirmed"] + assert message_in_db["reception_time"] == original_reception_time capped_message_in_db = await CappedMessage.collection.find_one( {"item_hash": item_hash} @@ -57,6 +59,7 @@ async def test_confirm_message(test_db): assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db) # Now, confirm the message + confirmation_reception_time = 123000 tx_context = TxContext( chain="ETH", hash="123", @@ -65,12 +68,15 @@ async def test_confirm_message(test_db): publisher="0xdeadbeef", ) - await process_one_message(message, tx_context) + await process_one_message(message, reception_time=confirmation_reception_time, tx_context=tx_context) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] + assert message_in_db["confirmation_time"] == tx_context.time + assert message_in_db["reception_time"] == original_reception_time + expected_confirmations = [tx_context.dict()] assert message_in_db["confirmations"] == expected_confirmations @@ -92,6 +98,7 @@ async def test_process_confirmed_message(test_db): """ item_hash = MESSAGE_DICT["item_hash"] + reception_time = 1000000 # Confirm the message message = parse_message(MESSAGE_DICT) @@ -102,13 +109,17 @@ async def test_process_confirmed_message(test_db): time=120000, publisher="0xdeadbeef", ) - await process_one_message(message, tx_context) + await process_one_message( + message, reception_time=reception_time, tx_context=tx_context + ) # Now, confirm the message message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] + assert message_in_db["confirmation_time"] == tx_context.time + assert message_in_db["reception_time"] == reception_time expected_confirmations = [tx_context.dict()] assert message_in_db["confirmations"] == expected_confirmations diff --git a/tests/helpers/message_test_helpers.py b/tests/helpers/message_test_helpers.py index be6192315..5506e39d2 100644 --- a/tests/helpers/message_test_helpers.py +++ b/tests/helpers/message_test_helpers.py @@ -12,6 +12,7 @@ def make_validated_message_from_dict( message_dict: Dict, raw_content: Optional[Union[str, bytes]] = None, confirmations: Optional[List[MessageConfirmation]] = None, + reception_time: Optional[float] = None, ): """ Creates a validated message instance from a raw message dictionary. @@ -46,4 +47,7 @@ def make_validated_message_from_dict( pending_message=pending_message, content=message_content, confirmations=confirmations or [], + # If no reception time is specified, just set it to the message time as propagation + # across Aleph is instantaneous, obviously. + reception_time=reception_time or pending_message.time, ) diff --git a/tests/schemas/test_validated_messages.py b/tests/schemas/test_validated_messages.py index c2cc9f39f..397ac54a7 100644 --- a/tests/schemas/test_validated_messages.py +++ b/tests/schemas/test_validated_messages.py @@ -59,6 +59,7 @@ def test_parse_aggregate_inline_message(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) assert isinstance(validated_message, ValidatedAggregateMessage) @@ -105,6 +106,7 @@ def test_parse_post_message_storage_content(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) check_basic_message_fields(validated_message, message_dict) @@ -139,6 +141,7 @@ def test_parse_store_message_inline_content(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) assert isinstance(validated_message, ValidatedStoreMessage) diff --git a/tests/storage/forget/test_forget_multi_users.py b/tests/storage/forget/test_forget_multi_users.py index f3666b273..6486ddca2 100644 --- a/tests/storage/forget/test_forget_multi_users.py +++ b/tests/storage/forget/test_forget_multi_users.py @@ -70,7 +70,7 @@ async def test_forget_multiusers_storage(mocker, test_db): await store_gridfs_file(key=file_hash, value=file_content) message_user1 = parse_message(message_user1_dict) - await process_one_message(message_user1) + await process_one_message(message_user1, reception_time=message_user1.time) message1_db = await Message.collection.find_one( {"item_hash": message_user1.item_hash} @@ -78,14 +78,14 @@ async def test_forget_multiusers_storage(mocker, test_db): assert message1_db is not None message_user2 = parse_message(message_user2_dict) - await process_one_message(message_user2) + await process_one_message(message_user2, reception_time=message_user2.time) # Sanity check: check that the file exists db_file_data = await read_gridfs_file(file_hash) assert db_file_data == file_content forget_message_user1 = parse_message(forget_message_user1_dict) - await process_one_message(forget_message_user1) + await process_one_message(forget_message_user1, reception_time=forget_message_user1.time) # Check that the message was properly forgotten forgotten_message = await Message.collection.find_one( diff --git a/tests/test_network.py b/tests/test_network.py index 918fb3b8d..8def4473a 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -117,5 +117,5 @@ async def async_magic(): "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", } message = parse_message(message_dict) - status, ops = await incoming(message) + status, ops = await incoming(message, reception_time=100000) assert status == IncomingStatus.MESSAGE_HANDLED