From d13507966fb017dce8d955473fc58cc156ab31db Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 11 Apr 2022 14:54:05 +0200 Subject: [PATCH 1/2] [Types] New dataclass for tx context The new `TxContext` class encapsulates metadata regarding chain transactions. --- src/aleph/chains/common.py | 15 +++++++++------ src/aleph/chains/ethereum.py | 22 +++++++++++++--------- src/aleph/chains/nuls2.py | 26 ++++++++++++++------------ src/aleph/chains/tx_context.py | 10 ++++++++++ src/aleph/jobs/process_pending_txs.py | 26 +++++++++++++------------- 5 files changed, 59 insertions(+), 40 deletions(-) create mode 100644 src/aleph/chains/tx_context.py diff --git a/src/aleph/chains/common.py b/src/aleph/chains/common.py index cde886f71..1a5e54869 100644 --- a/src/aleph/chains/common.py +++ b/src/aleph/chains/common.py @@ -1,9 +1,14 @@ import asyncio import json import logging +from dataclasses import asdict from enum import IntEnum from typing import Dict, Optional, Tuple, List +from aleph_message.models import MessageType +from bson import ObjectId +from pymongo import UpdateOne + from aleph.config import get_config from aleph.exceptions import ( AlephStorageException, @@ -21,9 +26,7 @@ from aleph.network import check_message as check_message_fn from aleph.permissions import check_sender_authorization from aleph.storage import get_json, pin_hash, add_json, get_message_content -from aleph_message.models import MessageType -from bson import ObjectId -from pymongo import UpdateOne +from .tx_context import TxContext LOGGER = logging.getLogger("chains.common") @@ -349,7 +352,7 @@ async def get_chaindata(messages, bulk_threshold=2000): async def get_chaindata_messages( - chaindata: Dict, context, seen_ids: Optional[List] = None + chaindata: Dict, context: TxContext, seen_ids: Optional[List] = None ): config = get_config() @@ -411,9 +414,9 @@ async def get_chaindata_messages( raise InvalidContent(error_msg) -async def incoming_chaindata(content, context): +async def incoming_chaindata(content: Dict, context: TxContext): """Incoming data from a chain. Content can be inline of "offchain" through an ipfs hash. For now we only add it to the database, it will be processed later. """ - await PendingTX.collection.insert_one({"content": content, "context": context}) + await PendingTX.collection.insert_one({"content": content, "context": asdict(context)}) diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 66d8227ac..18f279d14 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -2,6 +2,7 @@ import functools import json import logging +from typing import AsyncIterator, Dict, Tuple import pkg_resources from eth_account import Account @@ -27,6 +28,7 @@ register_outgoing_worker, ) from aleph.utils import run_in_executor +from .tx_context import TxContext LOGGER = logging.getLogger("chains.ethereum") CHAIN_NAME = "ETH" @@ -161,7 +163,9 @@ async def get_logs(config, web3: Web3, contract, start_height): raise -async def request_transactions(config, web3: Web3, contract, abi, start_height): +async def request_transactions( + config, web3: Web3, contract, abi, start_height +) -> AsyncIterator[Tuple[Dict, TxContext]]: """Continuously request data from the Ethereum blockchain. TODO: support websocket API. """ @@ -191,14 +195,14 @@ async def request_transactions(config, web3: Web3, contract, abi, start_height): message = event_data.args.message try: jdata = json.loads(message) - context = { - "chain_name": CHAIN_NAME, - "tx_hash": event_data.transactionHash.hex(), - "time": timestamp, - "height": event_data.blockNumber, - "publisher": publisher, - } - yield (jdata, context) + context = TxContext( + chain_name=CHAIN_NAME, + tx_hash=event_data.transactionHash.hex(), + time=timestamp, + height=event_data.blockNumber, + publisher=publisher, + ) + yield jdata, context except json.JSONDecodeError: # if it's not valid json, just ignore it... diff --git a/src/aleph/chains/nuls2.py b/src/aleph/chains/nuls2.py index da4cb3a85..d946b7a34 100644 --- a/src/aleph/chains/nuls2.py +++ b/src/aleph/chains/nuls2.py @@ -6,6 +6,7 @@ import struct import time from operator import itemgetter +from typing import AsyncIterator, Dict, Tuple import aiohttp from aiocache import cached, SimpleMemoryCache @@ -24,16 +25,17 @@ get_chaindata, incoming_chaindata, ) +from aleph.model.chains import Chain +from aleph.model.messages import Message +from aleph.model.pending import pending_messages_count, pending_txs_count from aleph.register_chain import ( register_verifier, register_incoming_worker, register_outgoing_worker, register_balance_getter, ) -from aleph.model.chains import Chain -from aleph.model.messages import Message -from aleph.model.pending import pending_messages_count, pending_txs_count from aleph.utils import run_in_executor +from .tx_context import TxContext LOGGER = logging.getLogger("chains.nuls2") CHAIN_NAME = "NULS2" @@ -154,7 +156,7 @@ async def get_transactions( yield tx -async def request_transactions(config, session, start_height): +async def request_transactions(config, session, start_height) -> AsyncIterator[Tuple[Dict, TxContext]]: """Continuously request data from the NULS blockchain.""" target_addr = config.nuls2.sync_address.value remark = config.nuls2.remark.value @@ -171,14 +173,14 @@ async def request_transactions(config, session, start_height): last_height = tx["height"] jdata = json.loads(ddata) - context = { - "chain_name": CHAIN_NAME, - "tx_hash": tx["hash"], - "height": tx["height"], - "time": tx["createTime"], - "publisher": tx["coinFroms"][0]["address"], - } - yield (jdata, context) + context = TxContext( + chain_name=CHAIN_NAME, + tx_hash=tx["hash"], + height=tx["height"], + time=tx["createTime"], + publisher=tx["coinFroms"][0]["address"], + ) + yield jdata, context except json.JSONDecodeError: # if it's not valid json, just ignore it... diff --git a/src/aleph/chains/tx_context.py b/src/aleph/chains/tx_context.py new file mode 100644 index 000000000..7147b9be4 --- /dev/null +++ b/src/aleph/chains/tx_context.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass +class TxContext: + chain_name: str + tx_hash: str + height: int + time: int + publisher: str diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index d55c08447..fa4a15b02 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -7,16 +7,17 @@ from typing import List, Dict, Optional import sentry_sdk +from pymongo import DeleteOne, InsertOne +from pymongo.errors import CursorNotFound +from setproctitle import setproctitle + from aleph.chains.common import get_chaindata_messages +from aleph.chains.tx_context import TxContext from aleph.exceptions import InvalidMessageError from aleph.logging import setup_logging from aleph.model.pending import PendingMessage, PendingTX from aleph.network import check_message from aleph.services.p2p import singleton -from pymongo import DeleteOne, InsertOne -from pymongo.errors import CursorNotFound -from setproctitle import setproctitle - from .job_utils import prepare_loop LOGGER = logging.getLogger("jobs.pending_txs") @@ -25,17 +26,16 @@ async def handle_pending_tx( pending, actions_list: List, seen_ids: Optional[List] = None ): - LOGGER.info( - "%s Handling TX in block %s" - % (pending["context"]["chain_name"], pending["context"]["height"]) - ) + tx_context = TxContext(**pending["context"]) + LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height) + messages = await get_chaindata_messages( - pending["content"], pending["context"], seen_ids=seen_ids + pending["content"], tx_context, seen_ids=seen_ids ) if messages: message_actions = list() for i, message in enumerate(messages): - message["time"] = pending["context"]["time"] + (i / 1000) # force order + message["time"] = tx_context.time + (i / 1000) # force order try: message = await check_message( @@ -51,9 +51,9 @@ async def handle_pending_tx( { "message": message, "source": dict( - chain_name=pending["context"]["chain_name"], - tx_hash=pending["context"]["tx_hash"], - height=pending["context"]["height"], + chain_name=tx_context.chain_name, + tx_hash=tx_context.tx_hash, + height=tx_context.height, check_message=True, # should we store this? ), } From a831488c47a19aea19cb8fc95bdcce85d8e1b148 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Sun, 1 May 2022 20:54:48 +0200 Subject: [PATCH 2/2] add comment for time field --- src/aleph/chains/tx_context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/aleph/chains/tx_context.py b/src/aleph/chains/tx_context.py index 7147b9be4..81839d3a6 100644 --- a/src/aleph/chains/tx_context.py +++ b/src/aleph/chains/tx_context.py @@ -6,5 +6,6 @@ class TxContext: chain_name: str tx_hash: str height: int + # Transaction timestamp, in Unix time (number of seconds since epoch). time: int publisher: str