diff --git a/deployment/migrations/scripts/0003-retrieve-confirmation-time.py b/deployment/migrations/scripts/0003-retrieve-confirmation-time.py new file mode 100644 index 000000000..abaebcd05 --- /dev/null +++ b/deployment/migrations/scripts/0003-retrieve-confirmation-time.py @@ -0,0 +1,48 @@ +""" +This migration retrieves additional metadata regarding chain confirmation of messages, +including the block timestamp. We reset the TX height of the node to reprocess +all the chain data messages and insert additional values +""" + + +import logging +import os +from configmanager import Config +from aleph.model.chains import Chain +from aleph.model.pending import PendingMessage, PendingTX +from aleph.model.messages import Message + +logger = logging.getLogger(os.path.basename(__file__)) + + +async def upgrade(config: Config, **kwargs): + logger.info("Resetting chain height to re-fetch all chaindata...") + start_height = config.ethereum.start_height.value + await Chain.set_last_height("ETH", start_height) + + logger.info("Dropping all pending transactions...") + await PendingTX.collection.delete_many({}) + + logger.info( + "Dropping all pending confirmation messages " + "(they will be reinserted automatically)..." + ) + await PendingMessage.collection.delete_many({"source.chain_name": {"$ne": None}}) + + logger.info("Removing confirmation data for all messages...") + # Confirmations will be automatically added again by the pending TX processor. + # By removing the confirmation entirely, we make sure to avoid intermediate states + # if a message was confirmed in an unexpected way. + await Message.collection.update_many( + {"confirmed": True}, + { + "$set": { + "confirmed": False, + }, + "$unset": {"confirmations": 1}, + }, + ) + + +async def downgrade(**kwargs): + raise NotImplementedError("Downgrading this migration is not supported.") diff --git a/deployment/migrations/scripts/0004-create-new-message-time-fields.py b/deployment/migrations/scripts/0004-create-new-message-time-fields.py new file mode 100644 index 000000000..80ebc533d --- /dev/null +++ b/deployment/migrations/scripts/0004-create-new-message-time-fields.py @@ -0,0 +1,32 @@ +""" +This migration adds the `confirmation_time` and `reception_time` fields. +`confirmation_time` serves as a cache of the first confirmation message seen +in on-chain data. +`reception_time` represents the first time the node became aware of +the message, confirmed or not. +""" + + +import logging +import os + +from configmanager import Config + +from aleph.model.messages import Message + +logger = logging.getLogger(os.path.basename(__file__)) + + +async def upgrade(config: Config, **kwargs): + logger.info("Creating confirmation_time field for messages...") + await Message.collection.update_many( + {"confirmed": True}, + [{"$set": {"confirmation_time": {"$min": "$confirmations.time"}}}], + ) + + +async def downgrade(**kwargs): + logger.info("Creating confirmation_time field for messages...") + await Message.collection.update_many( + {"$unset": {"confirmation_time": 1, "reception_time": 1}} + ) diff --git a/setup.cfg b/setup.cfg index 47a3d2519..891e6e543 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,7 +29,9 @@ package_dir = setup_requires = pyscaffold>=3.1a0,<3.2a0 pytest-runner>=2.0,<3dev -# Add here dependencies of your project (semicolon/line-separated), e.g. + +# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts. +# Update with care. Dependencies that were added to make it all work are annotated accordingly. install_requires = aiocache==0.11.1 aiohttp-cors==0.7.0 @@ -38,13 +40,15 @@ install_requires = aiohttp==3.8.1 aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7 aleph-client==0.4.6 - aleph-message==0.2.1 + aleph-message==0.2.2 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c coincurve==15.0.1 configmanager==1.35.1 configparser==5.2.0 cosmospy==6.0.0 dataclasses_json==0.5.6 + eciespy==0.3.11 # eth dependency + eth-hash==0.3.3 # eth dependency eth-keys==0.3.3 eth-rlp==0.2.1 eth_account==0.5.6 @@ -53,13 +57,14 @@ install_requires = msgpack==1.0.3 # required by aiocache nuls2-python@git+https://github.com/aleph-im/nuls2-python.git p2pclient==0.2.0 + protobuf==3.20.3 # eth dependency pymongo==3.12.3 pynacl==1.5.0 python-dateutil==2.8.2 python-socketio==5.5.1 pytz==2021.3 pyyaml==6.0 - requests==2.27.1 + requests==2.28.1 secp256k1==0.14.0 sentry-sdk==1.5.11 setproctitle==1.2.2 diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index 4c74a7400..326f645cf 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -1,11 +1,9 @@ import asyncio import json import logging -from dataclasses import asdict from enum import IntEnum -from typing import Dict, Optional, Tuple, List +from typing import Any, Dict, Optional, Tuple, List -from aleph_message.models import MessageConfirmation from bson import ObjectId from pymongo import UpdateOne @@ -25,18 +23,17 @@ from aleph.model.pending import PendingMessage, PendingTX from aleph.network import verify_signature from aleph.permissions import check_sender_authorization -from aleph.storage import get_json, pin_hash, add_json, get_message_content -from .tx_context import TxContext -from aleph.schemas.pending_messages import ( - BasePendingMessage, -) +from aleph.schemas.pending_messages import BasePendingMessage from aleph.schemas.validated_message import ( validate_pending_message, ValidatedStoreMessage, ValidatedForgetMessage, make_confirmation_update_query, -make_message_upsert_query, + make_message_upsert_query, ) +from ..schemas.message_confirmation import MessageConfirmation +from aleph.storage import get_json, pin_hash, add_json, get_message_content +from .tx_context import TxContext LOGGER = logging.getLogger("chains.common") @@ -64,21 +61,19 @@ async def mark_confirmed_data(chain_name, tx_hash, height): async def delayed_incoming( message: BasePendingMessage, - chain_name: Optional[str] = None, - tx_hash: Optional[str] = None, - height: Optional[int] = None, + reception_time: float, + tx_context: Optional[TxContext] = None, + check_message: bool = True, ): if message is None: return + await PendingMessage.collection.insert_one( { "message": message.dict(exclude={"content"}), - "source": dict( - chain_name=chain_name, - tx_hash=tx_hash, - height=height, - check_message=True, # should we store this? - ), + "tx_context": tx_context.dict() if tx_context else None, + "reception_time": reception_time, + "check_message": check_message, } ) @@ -91,9 +86,8 @@ class IncomingStatus(IntEnum): async def mark_message_for_retry( message: BasePendingMessage, - chain_name: Optional[str], - tx_hash: Optional[str], - height: Optional[int], + reception_time: float, + tx_context: Optional[TxContext], check_message: bool, retrying: bool, existing_id, @@ -101,16 +95,11 @@ async def mark_message_for_retry( message_dict = message.dict(exclude={"content"}) if not retrying: - await PendingMessage.collection.insert_one( - { - "message": message_dict, - "source": dict( - chain_name=chain_name, - tx_hash=tx_hash, - height=height, - check_message=check_message, # should we store this? - ), - } + await delayed_incoming( + message, + reception_time=reception_time, + tx_context=tx_context, + check_message=check_message, ) else: LOGGER.debug(f"Incrementing for {existing_id}") @@ -122,9 +111,8 @@ async def mark_message_for_retry( async def incoming( pending_message: BasePendingMessage, - chain_name: Optional[str] = None, - tx_hash: Optional[str] = None, - height: Optional[int] = None, + reception_time: float, + tx_context: Optional[TxContext] = None, seen_ids: Optional[Dict[Tuple, int]] = None, check_message: bool = False, retrying: bool = False, @@ -139,16 +127,23 @@ async def incoming( item_hash = pending_message.item_hash sender = pending_message.sender confirmations = [] + chain_name = tx_context.chain if tx_context is not None else None ids_key = (item_hash, sender, chain_name) - if chain_name and tx_hash and height: + if tx_context: if seen_ids is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] confirmations.append( - MessageConfirmation(chain=chain_name, hash=tx_hash, height=height) + MessageConfirmation( + chain=tx_context.chain, + hash=tx_context.hash, + height=tx_context.height, + time=tx_context.time, + publisher=tx_context.publisher, + ) ) filters = { @@ -178,14 +173,14 @@ async def incoming( updates: Dict[str, Dict] = {} if existing: - if seen_ids is not None and height is not None: + if seen_ids is not None and tx_context is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height LOGGER.debug("Updating %s." % item_hash) @@ -205,9 +200,8 @@ async def incoming( LOGGER.exception("Can't get content of object %r" % item_hash) await mark_message_for_retry( message=pending_message, - chain_name=chain_name, - tx_hash=tx_hash, - height=height, + reception_time=reception_time, + tx_context=tx_context, check_message=check_message, retrying=retrying, existing_id=existing_id, @@ -215,7 +209,10 @@ async def incoming( return IncomingStatus.RETRYING_LATER, [] validated_message = validate_pending_message( - pending_message=pending_message, content=content, confirmations=confirmations + pending_message=pending_message, + content=content, + reception_time=reception_time, + confirmations=confirmations, ) # warning: those handlers can modify message and content in place @@ -244,9 +241,8 @@ async def incoming( LOGGER.debug("Message type handler has failed, retrying later.") await mark_message_for_retry( message=pending_message, - chain_name=chain_name, - tx_hash=tx_hash, - height=height, + reception_time=reception_time, + tx_context=tx_context, check_message=check_message, retrying=retrying, existing_id=existing_id, @@ -264,14 +260,14 @@ async def incoming( LOGGER.warning("Invalid sender for %s" % item_hash) return IncomingStatus.MESSAGE_HANDLED, [] - if seen_ids is not None and height is not None: + if seen_ids is not None and tx_context is not None: if ids_key in seen_ids.keys(): - if height > seen_ids[ids_key]: + if tx_context.height > seen_ids[ids_key]: return IncomingStatus.MESSAGE_HANDLED, [] else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height else: - seen_ids[ids_key] = height + seen_ids[ids_key] = tx_context.height LOGGER.debug("New message to store for %s." % item_hash) @@ -386,5 +382,5 @@ async def incoming_chaindata(content: Dict, context: TxContext): For now we only add it to the database, it will be processed later. """ await PendingTX.collection.insert_one( - {"content": content, "context": asdict(context)} + {"content": content, "context": context.dict()} ) diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 1cbbdde53..cce6635ab 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -197,8 +197,8 @@ async def request_transactions( try: jdata = json.loads(message) context = TxContext( - chain_name=CHAIN_NAME, - tx_hash=event_data.transactionHash.hex(), + chain=CHAIN_NAME, + hash=event_data.transactionHash.hex(), time=timestamp, height=event_data.blockNumber, publisher=publisher, diff --git a/src/aleph/chains/nuls2.py b/src/aleph/chains/nuls2.py index d8afad46d..39f31269d 100644 --- a/src/aleph/chains/nuls2.py +++ b/src/aleph/chains/nuls2.py @@ -175,8 +175,8 @@ async def request_transactions(config, session, start_height) -> AsyncIterator[T jdata = json.loads(ddata) context = TxContext( - chain_name=CHAIN_NAME, - tx_hash=tx["hash"], + chain=CHAIN_NAME, + hash=tx["hash"], height=tx["height"], time=tx["createTime"], publisher=tx["coinFroms"][0]["address"], diff --git a/src/aleph/chains/tezos.py b/src/aleph/chains/tezos.py index 765f1ddc7..f061378c9 100644 --- a/src/aleph/chains/tezos.py +++ b/src/aleph/chains/tezos.py @@ -1,5 +1,7 @@ +import datetime as dt import json import logging +from enum import Enum from aleph_pytezos.crypto.key import Key @@ -10,13 +12,83 @@ LOGGER = logging.getLogger(__name__) CHAIN_NAME = "TEZOS" +# Default dApp URL for Micheline-style signatures +DEFAULT_DAPP_URL = "aleph.im" + + +class TezosSignatureType(str, Enum): + RAW = "raw" + MICHELINE = "micheline" + + +def timestamp_to_iso_8601(timestamp: float) -> str: + """ + Returns the timestamp formatted to ISO-8601, JS-style. + + Compared to the regular `isoformat()`, this function only provides precision down + to milliseconds and prints a "Z" instead of +0000 for UTC. + This format is typically used by JavaScript applications, like our TS SDK. + + Example: 2022-09-23T14:41:19.029Z + + :param timestamp: The timestamp to format. + :return: The formatted timestamp. + """ + + return ( + dt.datetime.utcfromtimestamp(timestamp).isoformat(timespec="milliseconds") + "Z" + ) + + +def micheline_verification_buffer( + verification_buffer: bytes, + timestamp: float, + dapp_url: str, +) -> bytes: + """ + Computes the verification buffer for Micheline-type signatures. + + This verification buffer is used when signing data with a Tezos web wallet. + See https://tezostaquito.io/docs/signing/#generating-a-signature-with-beacon-sdk. + + :param verification_buffer: The original (non-Tezos) verification buffer for the Aleph message. + :param timestamp: Timestamp of the message. + :param dapp_url: The URL of the dApp, for use as part of the verification buffer. + :return: The verification buffer used for the signature by the web wallet. + """ + + prefix = b"Tezos Signed Message:" + timestamp = timestamp_to_iso_8601(timestamp).encode("utf-8") + + payload = b" ".join( + (prefix, dapp_url.encode("utf-8"), timestamp, verification_buffer) + ) + hex_encoded_payload = payload.hex() + payload_size = str(len(hex_encoded_payload)).encode("utf-8") + + return b"\x05" + b"\x01\x00" + payload_size + payload + + +def get_tezos_verification_buffer( + message: BasePendingMessage, signature_type: TezosSignatureType, dapp_url: str +) -> bytes: + verification_buffer = get_verification_buffer(message) + + if signature_type == TezosSignatureType.RAW: + return verification_buffer + elif signature_type == TezosSignatureType.MICHELINE: + return micheline_verification_buffer( + verification_buffer, message.time, dapp_url + ) + + raise ValueError(f"Unsupported signature type: {signature_type}") + async def verify_signature(message: BasePendingMessage) -> bool: """ Verifies the cryptographic signature of a message signed with a Tezos key. """ - verification_buffer = get_verification_buffer(message) try: signature_dict = json.loads(message.signature) except json.JSONDecodeError: @@ -30,6 +102,9 @@ async def verify_signature(message: BasePendingMessage) -> bool: LOGGER.exception("'%s' key missing from Tezos signature dictionary.", e.args[0]) return False + signature_type = TezosSignatureType(signature_dict.get("signingType", "raw")) + dapp_url = signature_dict.get("dAppUrl", DEFAULT_DAPP_URL) + key = Key.from_encoded_key(public_key) # Check that the sender ID is equal to the public key hash public_key_hash = key.public_key_hash() @@ -41,6 +116,10 @@ async def verify_signature(message: BasePendingMessage) -> bool: public_key_hash, ) + verification_buffer = get_tezos_verification_buffer( + message, signature_type, dapp_url + ) + # Check the signature try: key.verify(signature, verification_buffer) diff --git a/src/aleph/chains/tx_context.py b/src/aleph/chains/tx_context.py index 81839d3a6..a627af8b9 100644 --- a/src/aleph/chains/tx_context.py +++ b/src/aleph/chains/tx_context.py @@ -1,11 +1,7 @@ -from dataclasses import dataclass +from aleph.schemas.message_confirmation import MessageConfirmation -@dataclass -class TxContext: - chain_name: str - tx_hash: str - height: int - # Transaction timestamp, in Unix time (number of seconds since epoch). - time: int - publisher: str +# At the moment, confirmation = chain transaction. This might change, but in the meantime +# having TxContext inherit MessageConfirmation avoids code duplication. +class TxContext(MessageConfirmation): + pass diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cb5aa69fd..69e1fdfab 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -14,12 +14,14 @@ from setproctitle import setproctitle from aleph.chains.common import incoming, IncomingStatus +from aleph.exceptions import InvalidMessageError from aleph.logging import setup_logging from aleph.model.db_bulk_operation import DbBulkOperation from aleph.model.pending import PendingMessage +from aleph.schemas.pending_messages import parse_message from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results -from ..schemas.pending_messages import parse_message +from ..chains.tx_context import TxContext LOGGER = getLogger("jobs.pending_messages") @@ -49,14 +51,24 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: - message = parse_message(pending["message"]) + delete_pending_message_op = DbBulkOperation( + PendingMessage, DeleteOne({"_id": pending["_id"]}) + ) + + try: + message = parse_message(pending["message"]) + except InvalidMessageError: + # If an invalid message somehow ended in pending messages, drop it. + return [delete_pending_message_op] + + tx_context_dict = pending.get("tx_context") + tx_context = TxContext.parse_obj(tx_context_dict) if tx_context_dict else None async with sem: status, operations = await incoming( pending_message=message, - chain_name=pending["source"].get("chain_name"), - tx_hash=pending["source"].get("tx_hash"), - height=pending["source"].get("height"), + reception_time=pending["reception_time"], + tx_context=tx_context, seen_ids=seen_ids, check_message=pending["source"].get("check_message", True), retrying=True, @@ -64,9 +76,7 @@ async def handle_pending_message( ) if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) + operations.append(delete_pending_message_op) return operations diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 588e8baa7..488dcf887 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -4,7 +4,9 @@ import asyncio import logging -from typing import List, Dict, Optional, Set +import time +from typing import List, Dict, Optional +from typing import Set import sentry_sdk from configmanager import Config @@ -31,13 +33,18 @@ async def handle_pending_tx( db_operations: List[DbBulkOperation] = [] tx_context = TxContext(**pending_tx["context"]) - LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height) + LOGGER.info("%s Handling TX in block %s", tx_context.chain, tx_context.height) messages = await get_chaindata_messages( pending_tx["content"], tx_context, seen_ids=seen_ids ) if messages: for i, message_dict in enumerate(messages): + reception_time = time.time() + # TODO: this update of the time field is unwanted, but needed to preserve + # the behavior of aggregates. Use the correct time field in aggregates + # and then remove this line. + message_dict["time"] = tx_context.time + (i / 1000) try: # we don't check signatures yet. @@ -55,12 +62,9 @@ async def handle_pending_tx( operation=InsertOne( { "message": message.dict(exclude={"content"}), - "source": dict( - chain_name=tx_context.chain_name, - tx_hash=tx_context.tx_hash, - height=tx_context.height, - check_message=True, # should we store this? - ), + "tx_context": tx_context.dict(), + "reception_time": reception_time, + "check_message": True, } ), ) diff --git a/src/aleph/model/messages.py b/src/aleph/model/messages.py index 01f214fb3..71dbe6256 100644 --- a/src/aleph/model/messages.py +++ b/src/aleph/model/messages.py @@ -228,7 +228,7 @@ async def get_merged_posts(filters, sort=None, limit=100, skip=0, amend_limit=1) } } }, - {"$project": {"amends": 0}}, + {"$project": {"_id": 0, "amends": 0}}, {"$replaceRoot": {"newRoot": {"$mergeObjects": ["$$ROOT", "$content"]}}}, ] diff --git a/src/aleph/schemas/message_confirmation.py b/src/aleph/schemas/message_confirmation.py new file mode 100644 index 000000000..2613b25de --- /dev/null +++ b/src/aleph/schemas/message_confirmation.py @@ -0,0 +1,16 @@ +from aleph_message.models import Chain +from pydantic import BaseModel, Field + + +class MessageConfirmation(BaseModel): + chain: Chain = Field(..., description="Chain from which the confirmation was fetched.") + height: int = Field(..., description="Block in which the confirmation was published.") + hash: str = Field( + ..., + description="Hash of the transaction/block in which the confirmation was published.", + ) + time: float = Field( + ..., + description="Transaction timestamp, in Unix time (number of seconds since epoch).", + ) + publisher: str = Field(..., description="Publisher of the confirmation on chain.") diff --git a/src/aleph/schemas/validated_message.py b/src/aleph/schemas/validated_message.py index e112252af..350401851 100644 --- a/src/aleph/schemas/validated_message.py +++ b/src/aleph/schemas/validated_message.py @@ -7,7 +7,6 @@ from typing import List, Literal, Optional, Generic, Dict, Type, Any from aleph_message.models import ( - MessageConfirmation, AggregateContent, ForgetContent, MessageType, @@ -26,6 +25,7 @@ PendingProgramMessage, PendingStoreMessage, ) +from .message_confirmation import MessageConfirmation from .message_content import MessageContent @@ -61,6 +61,8 @@ class BaseValidatedMessage(AlephBaseMessage, Generic[MType, ContentType]): content: ContentType confirmations: List[MessageConfirmation] = Field(default_factory=list) forgotten_by: List[str] = Field(default_factory=list) + reception_time: float + confirmation_time: Optional[float] class ValidatedAggregateMessage( @@ -96,6 +98,7 @@ class ValidatedStoreMessage( def validate_pending_message( pending_message: BasePendingMessage[MType, ContentType], content: MessageContent, + reception_time: float, confirmations: List[MessageConfirmation], ) -> BaseValidatedMessage[MType, ContentType]: @@ -115,6 +118,11 @@ def validate_pending_message( if json_content.get("time", None) is None: json_content["time"] = pending_message.time + if confirmations: + confirmation_time = min(confirmation.time for confirmation in confirmations) + else: + confirmation_time = None + # Note: we could use the construct method of Pydantic to bypass validation # and speed up the conversion process. However, this means giving up on validation. # At the time of writing, correctness seems more important than performance. @@ -124,6 +132,8 @@ def validate_pending_message( confirmed=bool(confirmations), confirmations=confirmations, size=len(content.raw_value), + reception_time=reception_time, + confirmation_time=confirmation_time, ) @@ -138,6 +148,11 @@ def make_confirmation_update_query(confirmations: List[MessageConfirmation]) -> return { "$max": {"confirmed": True}, + "$min": { + "confirmation_time": min( + confirmation.time for confirmation in confirmations + ) + }, "$addToSet": { "confirmations": { "$each": [confirmation.dict() for confirmation in confirmations] @@ -160,10 +175,18 @@ def make_message_upsert_query(message: BaseValidatedMessage[Any, Any]) -> Dict: "channel": message.channel, "signature": message.signature, }, - "$min": {"time": message.time}, + "$min": { + "time": message.time, + "reception_time": message.reception_time, + }, } # Add fields related to confirmations - updates.update(make_confirmation_update_query(message.confirmations)) + confirmation_updates = make_confirmation_update_query(message.confirmations) + for k, v in confirmation_updates.items(): + try: + updates[k].update(v) + except KeyError: + updates[k] = v return updates diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 46c8edb1f..79dfe0622 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -1,12 +1,13 @@ import asyncio import base64 import logging +import time from typing import Union import base58 -from ...exceptions import InvalidMessageError from .common import get_ipfs_api +from aleph.exceptions import InvalidMessageError LOGGER = logging.getLogger("IPFS.PUBSUB") @@ -48,9 +49,12 @@ async def incoming_channel(topic) -> None: try: async for mvalue in sub(topic): try: + reception_time = time.time() message = await get_pubsub_message(mvalue) LOGGER.debug("New message %r" % message) - asyncio.create_task(process_one_message(message)) + asyncio.create_task( + process_one_message(message, reception_time=reception_time) + ) except InvalidMessageError: LOGGER.warning(f"Invalid message {mvalue}") except Exception: diff --git a/src/aleph/services/p2p/protocol.py b/src/aleph/services/p2p/protocol.py index de632fbbe..19bbbb8b3 100644 --- a/src/aleph/services/p2p/protocol.py +++ b/src/aleph/services/p2p/protocol.py @@ -3,6 +3,7 @@ import json import logging import random +import time from typing import Any, Dict, Optional, Set from anyio.abc import SocketStream @@ -186,6 +187,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: try: async for pubsub_message in receive_pubsub_messages(stream): try: + reception_time = time.time() msg_dict = pubsub_msg_to_dict(pubsub_message) LOGGER.debug("Received from P2P:", msg_dict) # we should check the sender here to avoid spam @@ -196,7 +198,7 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: continue LOGGER.debug("New message %r" % message) - await delayed_incoming(message) + await delayed_incoming(message, reception_time) except Exception: LOGGER.exception("Can't handle message") diff --git a/src/aleph/web/__init__.py b/src/aleph/web/__init__.py index c3d4288ae..b4f8082fa 100644 --- a/src/aleph/web/__init__.py +++ b/src/aleph/web/__init__.py @@ -38,8 +38,8 @@ def init_sio(app: web.Application) -> socketio.AsyncServer: return sio -def create_app() -> web.Application: - app = web.Application(client_max_size=1024 ** 2 * 64) +def create_app(debug: bool = False) -> web.Application: + app = web.Application(client_max_size=1024**2 * 64, debug=debug) tpl_path = pkg_resources.resource_filename("aleph.web", "templates") jinja_loader = jinja2.ChoiceLoader( diff --git a/src/aleph/web/controllers/aggregates.py b/src/aleph/web/controllers/aggregates.py index 6f83a59d1..efc90ed31 100644 --- a/src/aleph/web/controllers/aggregates.py +++ b/src/aleph/web/controllers/aggregates.py @@ -1,6 +1,27 @@ +from typing import List, Optional + from aiohttp import web +from pydantic import BaseModel, validator, ValidationError from aleph.model.messages import get_computed_address_aggregates +from .utils import LIST_FIELD_SEPARATOR + + +DEFAULT_LIMIT = 1000 + + +class AggregatesQueryParams(BaseModel): + keys: Optional[List[str]] = None + limit: int = DEFAULT_LIMIT + + @validator( + "keys", + pre=True, + ) + def split_str(cls, v): + if isinstance(v, str): + return v.split(LIST_FIELD_SEPARATOR) + return v async def address_aggregate(request): @@ -10,15 +31,15 @@ async def address_aggregate(request): address = request.match_info["address"] - keys = request.query.get("keys", None) - if keys is not None: - keys = keys.split(",") - - limit = request.query.get("limit", "1000") - limit = int(limit) + try: + query_params = AggregatesQueryParams.parse_obj(request.query) + except ValidationError as e: + raise web.HTTPUnprocessableEntity( + text=e.json(), content_type="application/json" + ) aggregates = await get_computed_address_aggregates( - address_list=[address], key_list=keys, limit=limit + address_list=[address], key_list=query_params.keys, limit=query_params.limit ) if not aggregates.get(address): diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 0ba2468d5..6625a0ad2 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -215,12 +215,10 @@ async def messages_ws(request: web.Request): break except ConnectionResetError: - closing = True break except Exception: if ws.closed: - closing = True break LOGGER.exception("Error processing") diff --git a/src/aleph/web/controllers/posts.py b/src/aleph/web/controllers/posts.py index 126da1b31..916f036e4 100644 --- a/src/aleph/web/controllers/posts.py +++ b/src/aleph/web/controllers/posts.py @@ -85,10 +85,7 @@ async def view_posts_list(request): context = {"posts": posts} if pagination_per_page is not None: - total_msgs = await Message.collection.count_documents( - filter=find_filters, - projection={"_id": 0}, - ) + total_msgs = await Message.collection.count_documents(filter=find_filters) pagination = Pagination( pagination_page, diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index 13f18fd95..b227013e4 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -7,6 +7,7 @@ PER_PAGE = 20 PER_PAGE_SUMMARY = 50 +LIST_FIELD_SEPARATOR = "," class Pagination(object): diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/conftest.py b/tests/api/conftest.py new file mode 100644 index 000000000..20e6ab15b --- /dev/null +++ b/tests/api/conftest.py @@ -0,0 +1,32 @@ +import json +from pathlib import Path +from typing import Dict, List + +import pytest_asyncio +from aleph.model.messages import Message + + +async def _load_fixtures(filename: str): + fixtures_dir = Path(__file__).parent / "fixtures" + fixtures_file = fixtures_dir / filename + + with fixtures_file.open() as f: + messages = json.load(f) + + await Message.collection.insert_many(messages) + return messages + + +@pytest_asyncio.fixture +async def fixture_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_messages.json") + + +@pytest_asyncio.fixture +async def fixture_aggregate_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_aggregates.json") + + +@pytest_asyncio.fixture +async def fixture_post_messages(test_db) -> List[Dict]: + return await _load_fixtures("fixture_posts.json") diff --git a/tests/api/fixtures/fixture_aggregates.json b/tests/api/fixtures/fixture_aggregates.json new file mode 100644 index 000000000..c488e78e8 --- /dev/null +++ b/tests/api/fixtures/fixture_aggregates.json @@ -0,0 +1,123 @@ +[ + { + "chain": "ETH", + "item_hash": "53c2b16aa84b10878982a2920844625546f5db32337ecd9dd15928095a30381c", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857371.391834, + "key": "test_reference", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857371.391834,\"key\":\"test_reference\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0x7eee4cfc03b963ec51f04f60f6f7d58b0f24e0309d209feecb55af9e411ed1c01cfb547bb13539e91308b044c3661d93ddf272426542bc1a47722614cb0cd3621c", + "size": 128, + "time": 1644859283.101 + }, + { + "chain": "ETH", + "item_hash": "0022ed09d16a1c3d6cbb3c7e2645657ebaa0382eba65be06264b106f528b85bf", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857704.6253593, + "key": "test_reference", + "content": { + "c": 3, + "d": 4 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857704.6253593,\"key\":\"test_reference\",\"content\":{\"c\":3,\"d\":4}}", + "item_type": "inline", + "signature": "0xe6129196c36b066302692b53bcb78a9d8c996854b170238ebfe56924f0b6be604883c30a66d75250de489e1edb683c7da8ddb1ccb50a39d1bbbdad617e5c958f1b", + "size": 129, + "time": 1644859283.12 + }, + { + "chain": "ETH", + "item_hash": "a87004aa03f8ae63d2c4bbe84b93b9ce70ca6482ce36c82ab0b0f689fc273f34", + "sender": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "content": { + "address": "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4", + "time": 1648215802.3821976, + "key": "test_reference", + "content": { + "c": 3, + "d": 4 + } + }, + "item_content": "{\"address\":\"0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4\",\"time\":1648215802.3821976,\"key\":\"test_reference\",\"content\":{\"c\":3,\"d\":4}}", + "item_type": "inline", + "signature": "0xc0f6ce2e4e9561b3949d51a97b8746125e1f031bbc13813cc74f1f61eea654fe300ad5e9ec098d41374bc0e43f83f2d66b834672abb811ae6a2dcdbd09f2565f1c", + "size": 129, + "time": 1648467547.771 + }, + { + "chain": "ETH", + "item_hash": "f875631a6c4a70ce44143bdd9a64861a5ce6f68e2267a00979ff0ad399a6c780", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "confirmations": [ + { + "chain": "ETH", + "height": 14205580, + "hash": "0x234b3cb25e893780c4cf50ec82c4ae9a61d61b766a367b559ae8192463e84a1b" + } + ], + "confirmed": true, + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644857371.1748412, + "key": "test_target", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644857371.1748412,\"key\":\"test_target\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0xaa28dafaecfd063bd30f65c877260bcdab37931fe7d8ef13173a952ae57a79e544d9fc9ae9131ba6ce6638bdbd62996467eb4a999416603ff2d1eaff372427bd1b", + "size": 126, + "time": 1644859283.1 + }, + { + "chain": "ETH", + "item_hash": "8c83e020b1f0661de3238ecaf2a41fd2f9dfe4a6c56453ccdf3ddd3fa4fae147", + "sender": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "type": "AGGREGATE", + "channel": "INTEGRATION_TESTS", + "confirmations": [ + { + "chain": "ETH", + "height": 14205311, + "hash": "0x805dcf51856c813d5524f5b64555145fce6487b81dc605e6657fd208bebb2e05" + } + ], + "confirmed": true, + "content": { + "address": "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98", + "time": 1644853185.0710306, + "key": "test_key", + "content": { + "a": 1, + "b": 2 + } + }, + "item_content": "{\"address\":\"0x720F319A9c3226dCDd7D8C49163D79EDa1084E98\",\"time\":1644853185.0710306,\"key\":\"test_key\",\"content\":{\"a\":1,\"b\":2}}", + "item_type": "inline", + "signature": "0x4e3060c596de77b19f2791fbc34eff3d6c89c63d29250c960e9c1b752898d22d0f7d7759dc0b0d935b93e29534e6861d7a8deeb75cd69836acf0ad0e6e8626601b", + "size": 123, + "time": 1644855661.089 + } +] diff --git a/tests/api/fixtures/fixture_posts.json b/tests/api/fixtures/fixture_posts.json new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/test_aggregates.py b/tests/api/test_aggregates.py new file mode 100644 index 000000000..6dc23502d --- /dev/null +++ b/tests/api/test_aggregates.py @@ -0,0 +1,171 @@ +import itertools +from typing import Dict, Iterable, List + +import aiohttp +import pytest + +AGGREGATES_URI = "/api/v0/aggregates/{address}.json" + +# Another address with three aggregates +ADDRESS_1 = "0x720F319A9c3226dCDd7D8C49163D79EDa1084E98" +# Another address with one aggregate +ADDRESS_2 = "0xaC033C1cA5C49Eff98A1D9a56BeDBC4840010BA4" + +EXPECTED_AGGREGATES = { + ADDRESS_1: { + "test_key": {"a": 1, "b": 2}, + "test_target": {"a": 1, "b": 2}, + "test_reference": {"a": 1, "b": 2, "c": 3, "d": 4}, + }, + ADDRESS_2: {"test_reference": {"c": 3, "d": 4}}, +} + + +def make_uri(address: str) -> str: + return AGGREGATES_URI.format(address=address) + + +def assert_aggregates_equal(expected: List[Dict], actual: Dict[str, Dict]): + for expected_aggregate in expected: + aggregate = actual[expected_aggregate["content"]["key"]] + assert "_id" not in aggregate + + assert aggregate == expected_aggregate["content"]["content"] + + +def merge_aggregates(messages: Iterable[Dict]) -> List[Dict]: + def merge_content(_messages: List[Dict]) -> Dict: + original = _messages[0] + for update in _messages[1:]: + original["content"]["content"].update(update["content"]["content"]) + return original + + aggregates = [] + + for key, group in itertools.groupby( + sorted(messages, key=lambda msg: msg["content"]["key"]), + lambda msg: msg["content"]["key"], + ): + sorted_messages = sorted(group, key=lambda msg: msg["time"]) + aggregates.append(merge_content(sorted_messages)) + + return aggregates + + +async def get_aggregates(api_client, address: str, **params) -> aiohttp.ClientResponse: + return await api_client.get(make_uri(address), params=params) + + +async def get_aggregates_expect_success(api_client, address: str, **params): + response = await get_aggregates(api_client, address, **params) + assert response.status == 200, await response.text() + return await response.json() + + +@pytest.fixture() +def fixture_aggregates(fixture_aggregate_messages): + return merge_aggregates(fixture_aggregate_messages) + + +@pytest.mark.asyncio +async def test_get_aggregates_no_update(ccn_api_client, fixture_aggregates): + """ + Tests receiving an aggregate from an address which posted one aggregate and never + updated it. + """ + + address = ADDRESS_2 + aggregates = await get_aggregates_expect_success(ccn_api_client, address) + + assert aggregates["address"] == address + assert aggregates["data"] == EXPECTED_AGGREGATES[address] + + +@pytest.mark.asyncio +async def test_get_aggregates(ccn_api_client, fixture_aggregates: List[Dict]): + """ + A more complex case with 3 aggregates, one of which was updated. + """ + + address = ADDRESS_1 + aggregates = await get_aggregates_expect_success(ccn_api_client, address) + + assert address == aggregates["address"] + assert aggregates["data"]["test_key"] == {"a": 1, "b": 2} + assert aggregates["data"]["test_target"] == {"a": 1, "b": 2} + assert aggregates["data"]["test_reference"] == {"a": 1, "b": 2, "c": 3, "d": 4} + + assert_aggregates_equal(fixture_aggregates, aggregates["data"]) + + +@pytest.mark.asyncio +async def test_get_aggregates_filter_by_key( + ccn_api_client, fixture_aggregates: List[Dict] +): + """ + Tests the 'keys' query parameter. + """ + + address, key = ADDRESS_1, "test_target" + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=key + ) + assert aggregates["address"] == address + assert aggregates["data"][key] == EXPECTED_AGGREGATES[address][key] + + # Multiple keys + address, keys = ADDRESS_1, ["test_target", "test_reference"] + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=",".join(keys) + ) + assert aggregates["address"] == address + for key in keys: + assert ( + aggregates["data"][key] == EXPECTED_AGGREGATES[address][key] + ), f"Key {key} does not match" + + +@pytest.mark.asyncio +async def test_get_aggregates_limit(ccn_api_client, fixture_aggregates: List[Dict]): + """ + Tests the 'limit' query parameter. + """ + + address, key = ADDRESS_1, "test_reference" + aggregates = await get_aggregates_expect_success( + ccn_api_client, address=address, keys=key, limit=1 + ) + assert aggregates["address"] == address + assert aggregates["data"][key] == {"c": 3, "d": 4} + + +@pytest.mark.asyncio +async def test_get_aggregates_invalid_address( + ccn_api_client, fixture_aggregates: List[Dict] +): + """ + Pass an unknown address. + """ + + invalid_address = "unknown" + + response = await get_aggregates(ccn_api_client, invalid_address) + assert response.status == 404 + + +@pytest.mark.asyncio +async def test_get_aggregates_invalid_params( + ccn_api_client, fixture_aggregates: List[Dict] +): + """ + Tests that passing invalid parameters returns a 422 error. + """ + + # A string as limit + response = await get_aggregates(ccn_api_client, ADDRESS_1, limit="abc") + assert response.status == 422 + assert response.content_type == "application/json" + + errors = await response.json() + assert len(errors) == 1 + assert errors[0]["loc"] == ["limit"], errors diff --git a/tests/api/test_messages.py b/tests/api/test_messages.py index 54d4e527d..3fd9337c9 100644 --- a/tests/api/test_messages.py +++ b/tests/api/test_messages.py @@ -1,20 +1,13 @@ import itertools -import json -from pathlib import Path -from typing import Dict, Iterable, List +from typing import Dict, Iterable import pytest -import pytest_asyncio -from aleph.model.messages import Message +from .utils import get_messages_by_keys MESSAGES_URI = "/api/v0/messages.json" -def get_messages_by_keys(messages: Iterable[Dict], **keys) -> List[Dict]: - return [msg for msg in messages if all(msg[k] == v for k, v in keys.items())] - - def check_message_fields(messages: Iterable[Dict]): """ Basic checks on fields. For example, check that we do not expose internal data @@ -38,18 +31,6 @@ def assert_messages_equal(messages: Iterable[Dict], expected_messages: Iterable[ assert message["signature"] == expected_message["signature"] -@pytest_asyncio.fixture -async def fixture_messages(test_db): - fixtures_dir = Path(__file__).parent / "fixtures" - fixtures_file = fixtures_dir / "fixture_messages.json" - - with fixtures_file.open() as f: - messages = json.load(f) - - await Message.collection.insert_many(messages) - return messages - - @pytest.mark.asyncio async def test_get_messages(fixture_messages, ccn_api_client): response = await ccn_api_client.get(MESSAGES_URI) diff --git a/tests/api/test_posts.py b/tests/api/test_posts.py new file mode 100644 index 000000000..436f6d932 --- /dev/null +++ b/tests/api/test_posts.py @@ -0,0 +1,59 @@ +from typing import Dict, Iterable + +import aiohttp +import pytest +from aleph_message.models import MessageType + +from .utils import get_messages_by_keys + +POSTS_URI = "/api/v0/posts.json" + + +def assert_posts_equal(posts: Iterable[Dict], expected_messages: Iterable[Dict]): + posts_by_hash = {post["item_hash"]: post for post in posts} + + for expected_message in expected_messages: + post = posts_by_hash[expected_message["item_hash"]] + assert "_id" not in post + + assert post["chain"] == expected_message["chain"] + assert post["channel"] == expected_message["channel"] + assert post["sender"] == expected_message["sender"] + assert post["signature"] == expected_message["signature"] + + if expected_message.get("forgotten_by", []): + assert post["content"] is None + continue + + if "content" not in expected_message["content"]: + # TODO: there is a problem with the spec of posts: they can be specified + # without an internal "content" field, which does not break the + # endpoint but returns the content of message["content"] instead. + # We skip the issue for now. + continue + + assert post["content"] == expected_message["content"]["content"] + + +async def get_posts(api_client, **params) -> aiohttp.ClientResponse: + return await api_client.get(POSTS_URI, params=params) + + +async def get_posts_expect_success(api_client, **params): + response = await get_posts(api_client, **params) + assert response.status == 200, await response.text() + data = await response.json() + return data["posts"] + + +@pytest.mark.asyncio +async def test_get_posts(fixture_messages, ccn_api_client): + # The POST messages in the fixtures file do not amend one another, so we should have + # 1 POST = 1 message. + post_messages = get_messages_by_keys( + fixture_messages, + type=MessageType.post, + ) + posts = await get_posts_expect_success(ccn_api_client) + + assert_posts_equal(posts, post_messages) diff --git a/tests/api/utils/__init__.py b/tests/api/utils/__init__.py new file mode 100644 index 000000000..182366ba5 --- /dev/null +++ b/tests/api/utils/__init__.py @@ -0,0 +1,27 @@ +from typing import Dict, Iterable, List, Callable + + +def get_messages_by_predicate( + messages: Iterable[Dict], predicate: Callable[[Dict], bool] +) -> List[Dict]: + """ + Filters messages based on a user-provided predicate + (=a function/lambda operating on a single message). + """ + + return [msg for msg in messages if predicate(msg)] + + +def get_messages_by_keys(messages: Iterable[Dict], **keys) -> List[Dict]: + """ + Filters messages based on user-provided keys. + + Example: + >>> filtered_messages = get_messages_by_keys( + >>> message_list, item_hash="some-hash", channel="MY-CHANNEL" + >>> ) + + """ + return get_messages_by_predicate( + messages, lambda msg: all(msg[k] == v for k, v in keys.items()) + ) diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index 1d12a06c2..dca327e41 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -68,5 +68,6 @@ async def async_magic(): message_dict["item_type"] = "inline" message = parse_message(message_dict) - status, ops = await incoming(message, check_message=True) + status, ops = await incoming(message, reception_time=100000, check_message=True) + assert status == IncomingStatus.MESSAGE_HANDLED diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index 5143150f8..cfbadc635 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -4,6 +4,7 @@ import pytest from aleph.chains.common import process_one_message +from aleph.chains.tx_context import TxContext from aleph.model.messages import CappedMessage, Message from aleph.schemas.pending_messages import parse_message @@ -42,12 +43,14 @@ async def test_confirm_message(test_db): content = json.loads(MESSAGE_DICT["item_content"]) message = parse_message(MESSAGE_DICT) - await process_one_message(message) + original_reception_time = 100000 + await process_one_message(message, reception_time=original_reception_time) message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["content"] == content assert not message_in_db["confirmed"] + assert message_in_db["reception_time"] == original_reception_time capped_message_in_db = await CappedMessage.collection.find_one( {"item_hash": item_hash} @@ -56,18 +59,26 @@ async def test_confirm_message(test_db): assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db) # Now, confirm the message - chain_name, tx_hash, height = "ETH", "123", 8000 - await process_one_message( - message, chain_name=chain_name, tx_hash=tx_hash, height=height + confirmation_reception_time = 123000 + tx_context = TxContext( + chain="ETH", + hash="123", + height=8000, + time=120000, + publisher="0xdeadbeef", ) + await process_one_message(message, reception_time=confirmation_reception_time, tx_context=tx_context) + message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] - assert {"chain": chain_name, "hash": tx_hash, "height": height} in message_in_db[ - "confirmations" - ] + assert message_in_db["confirmation_time"] == tx_context.time + assert message_in_db["reception_time"] == original_reception_time + + expected_confirmations = [tx_context.dict()] + assert message_in_db["confirmations"] == expected_confirmations capped_message_after_confirmation = await CappedMessage.collection.find_one( {"item_hash": item_hash} @@ -87,20 +98,30 @@ async def test_process_confirmed_message(test_db): """ item_hash = MESSAGE_DICT["item_hash"] + reception_time = 1000000 # Confirm the message - chain_name, tx_hash, height = "ETH", "123", 8000 message = parse_message(MESSAGE_DICT) + tx_context = TxContext( + chain="ETH", + hash="123", + height=8000, + time=120000, + publisher="0xdeadbeef", + ) await process_one_message( - message, chain_name=chain_name, tx_hash=tx_hash, height=height + message, reception_time=reception_time, tx_context=tx_context ) + # Now, confirm the message message_in_db = await Message.collection.find_one({"item_hash": item_hash}) assert message_in_db is not None assert message_in_db["confirmed"] + assert message_in_db["confirmation_time"] == tx_context.time + assert message_in_db["reception_time"] == reception_time - expected_confirmations = [{"chain": chain_name, "hash": tx_hash, "height": height}] + expected_confirmations = [tx_context.dict()] assert message_in_db["confirmations"] == expected_confirmations capped_message_in_db = await CappedMessage.collection.find_one( @@ -109,4 +130,4 @@ async def test_process_confirmed_message(test_db): assert remove_id_key(message_in_db) == remove_id_key(capped_message_in_db) assert capped_message_in_db["confirmed"] - assert capped_message_in_db["confirmations"] == expected_confirmations + assert capped_message_in_db["confirmations"] == [tx_context.dict()] diff --git a/tests/chains/test_tezos.py b/tests/chains/test_tezos.py index 4c6be4c37..14ca20402 100644 --- a/tests/chains/test_tezos.py +++ b/tests/chains/test_tezos.py @@ -2,10 +2,13 @@ from aleph.network import verify_signature from aleph.schemas.pending_messages import parse_message +from aleph.chains import ( + tezos, +) # TODO: this import is currently necessary because of circular dependencies @pytest.mark.asyncio -async def test_tezos_verify_signature(): +async def test_tezos_verify_signature_raw(): message_dict = { "chain": "TEZOS", "channel": "TEST", @@ -28,7 +31,7 @@ async def test_tezos_verify_signature(): @pytest.mark.asyncio -async def test_tezos_verify_signature_ed25519(): +async def test_tezos_verify_signature_raw_ed25519(): message_dict = { "chain": "TEZOS", "sender": "tz1SmGHzna3YhKropa3WudVq72jhTPDBn4r5", @@ -43,3 +46,20 @@ async def test_tezos_verify_signature_ed25519(): message = parse_message(message_dict) await verify_signature(message) + + +@pytest.mark.asyncio +async def test_tezos_verify_signature_micheline(): + message_dict = { + "chain": "TEZOS", + "sender": "tz1VrPqrVdMFsgykWyhGH7SYcQ9avHTjPcdD", + "type": "POST", + "channel": "ALEPH-TEST", + "signature": '{"signingType":"micheline","signature":"sigXD8iT5ivdawgPzE1AbtDwqqAjJhS5sHS1psyE74YjfiaQnxWZsATNjncdsuQw3b9xaK79krxtsC8uQoT5TcUXmo66aovT","publicKey":"edpkvapDnjnasrNcmUdMZXhQZwpX6viPyuGCq6nrP4W7ZJCm7EFTpS"}', + "time": 1663944079.029, + "item_type": "storage", + "item_hash": "72b2722b95582419cfa71f631ff6c6afc56344dc6a4609e772877621813040b7", + } + + message = parse_message(message_dict) + await verify_signature(message) diff --git a/tests/conftest.py b/tests/conftest.py index 5f73a7158..d4d92281f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,7 +62,7 @@ async def ccn_api_client(aiohttp_client, mock_config): event_loop = asyncio.get_event_loop() event_loop.set_debug(True) - app = create_app() + app = create_app(debug=True) app["config"] = mock_config client = await aiohttp_client(app) diff --git a/tests/helpers/message_test_helpers.py b/tests/helpers/message_test_helpers.py index be6192315..5506e39d2 100644 --- a/tests/helpers/message_test_helpers.py +++ b/tests/helpers/message_test_helpers.py @@ -12,6 +12,7 @@ def make_validated_message_from_dict( message_dict: Dict, raw_content: Optional[Union[str, bytes]] = None, confirmations: Optional[List[MessageConfirmation]] = None, + reception_time: Optional[float] = None, ): """ Creates a validated message instance from a raw message dictionary. @@ -46,4 +47,7 @@ def make_validated_message_from_dict( pending_message=pending_message, content=message_content, confirmations=confirmations or [], + # If no reception time is specified, just set it to the message time as propagation + # across Aleph is instantaneous, obviously. + reception_time=reception_time or pending_message.time, ) diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index 76a0f61c4..2b3a9814c 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -33,8 +33,8 @@ async def test_process_pending_tx(mocker, test_db): "content": "test-data-pending-tx-messages", }, "context": { - "chain_name": "ETH", - "tx_hash": "0xf49cb176c1ce4f6eb7b9721303994b05074f8fadc37b5f41ac6f78bdf4b14b6c", + "chain": "ETH", + "hash": "0xf49cb176c1ce4f6eb7b9721303994b05074f8fadc37b5f41ac6f78bdf4b14b6c", "time": 1632835747, "height": 13314512, "publisher": "0x23eC28598DCeB2f7082Cc3a9D670592DfEd6e0dC", diff --git a/tests/schemas/test_validated_messages.py b/tests/schemas/test_validated_messages.py index dda32fa73..397ac54a7 100644 --- a/tests/schemas/test_validated_messages.py +++ b/tests/schemas/test_validated_messages.py @@ -6,8 +6,6 @@ import json from typing import Dict -from aleph_message.models import MessageConfirmation - from aleph.schemas.message_content import MessageContent, ContentSource from aleph.schemas.pending_messages import ( PendingAggregateMessage, @@ -21,6 +19,7 @@ ValidatedAggregateMessage, ValidatedStoreMessage, ) +from aleph.schemas.message_confirmation import MessageConfirmation def check_basic_message_fields(message: BaseValidatedMessage, message_dict: Dict): @@ -60,6 +59,7 @@ def test_parse_aggregate_inline_message(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) assert isinstance(validated_message, ValidatedAggregateMessage) @@ -106,6 +106,7 @@ def test_parse_post_message_storage_content(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) check_basic_message_fields(validated_message, message_dict) @@ -132,7 +133,7 @@ def test_parse_store_message_inline_content(): pending_message = parse_message(message_dict) assert isinstance(pending_message, PendingStoreMessage) - confirmations = [MessageConfirmation(chain="ETH", height=1234, hash="abcd")] + confirmations = [MessageConfirmation(chain="ETH", height=1234, hash="abcd", time=8000, publisher="0xsomething")] message_content = MessageContent( pending_message.item_hash, ContentSource.INLINE, content, item_content ) @@ -140,6 +141,7 @@ def test_parse_store_message_inline_content(): pending_message=pending_message, content=message_content, confirmations=confirmations, + reception_time=pending_message.time, ) assert isinstance(validated_message, ValidatedStoreMessage) diff --git a/tests/storage/forget/test_forget_multi_users.py b/tests/storage/forget/test_forget_multi_users.py index f3666b273..6486ddca2 100644 --- a/tests/storage/forget/test_forget_multi_users.py +++ b/tests/storage/forget/test_forget_multi_users.py @@ -70,7 +70,7 @@ async def test_forget_multiusers_storage(mocker, test_db): await store_gridfs_file(key=file_hash, value=file_content) message_user1 = parse_message(message_user1_dict) - await process_one_message(message_user1) + await process_one_message(message_user1, reception_time=message_user1.time) message1_db = await Message.collection.find_one( {"item_hash": message_user1.item_hash} @@ -78,14 +78,14 @@ async def test_forget_multiusers_storage(mocker, test_db): assert message1_db is not None message_user2 = parse_message(message_user2_dict) - await process_one_message(message_user2) + await process_one_message(message_user2, reception_time=message_user2.time) # Sanity check: check that the file exists db_file_data = await read_gridfs_file(file_hash) assert db_file_data == file_content forget_message_user1 = parse_message(forget_message_user1_dict) - await process_one_message(forget_message_user1) + await process_one_message(forget_message_user1, reception_time=forget_message_user1.time) # Check that the message was properly forgotten forgotten_message = await Message.collection.find_one( diff --git a/tests/storage/test_store_message.py b/tests/storage/test_store_message.py index 458458cac..b6ed749e5 100644 --- a/tests/storage/test_store_message.py +++ b/tests/storage/test_store_message.py @@ -1,7 +1,6 @@ import json import pytest -from aleph_message.models import MessageConfirmation from aleph.handlers.storage import handle_new_storage from aleph.schemas.message_content import ContentSource, RawContent @@ -9,6 +8,7 @@ ValidatedStoreMessage, StoreContentWithMetadata, ) +from aleph.schemas.message_confirmation import MessageConfirmation from message_test_helpers import make_validated_message_from_dict @@ -33,6 +33,8 @@ def fixture_message_file(): chain="ETH", hash="0x28fd852984b1f2222ca1870a97f44cc34b535a49d2618f5689a10a67985935d5", height=14276536, + time=9000, + publisher="0xsomething", ) ], ) diff --git a/tests/test_network.py b/tests/test_network.py index 918fb3b8d..8def4473a 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -117,5 +117,5 @@ async def async_magic(): "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", } message = parse_message(message_dict) - status, ops = await incoming(message) + status, ops = await incoming(message, reception_time=100000) assert status == IncomingStatus.MESSAGE_HANDLED