Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import asyncio
import json
import logging
from dataclasses import asdict
from enum import IntEnum
from typing import Dict, Optional, Tuple, List

from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne

from aleph.config import get_config
from aleph.exceptions import (
AlephStorageException,
Expand All @@ -21,9 +26,7 @@
from aleph.network import check_message as check_message_fn
from aleph.permissions import check_sender_authorization
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne
from .tx_context import TxContext

LOGGER = logging.getLogger("chains.common")

Expand Down Expand Up @@ -349,7 +352,7 @@ async def get_chaindata(messages, bulk_threshold=2000):


async def get_chaindata_messages(
chaindata: Dict, context, seen_ids: Optional[List] = None
chaindata: Dict, context: TxContext, seen_ids: Optional[List] = None
):
config = get_config()

Expand Down Expand Up @@ -411,9 +414,9 @@ async def get_chaindata_messages(
raise InvalidContent(error_msg)


async def incoming_chaindata(content, context):
async def incoming_chaindata(content: Dict, context: TxContext):
"""Incoming data from a chain.
Content can be inline of "offchain" through an ipfs hash.
For now we only add it to the database, it will be processed later.
"""
await PendingTX.collection.insert_one({"content": content, "context": context})
await PendingTX.collection.insert_one({"content": content, "context": asdict(context)})
22 changes: 13 additions & 9 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools
import json
import logging
from typing import AsyncIterator, Dict, Tuple

import pkg_resources
from eth_account import Account
Expand All @@ -27,6 +28,7 @@
register_outgoing_worker,
)
from aleph.utils import run_in_executor
from .tx_context import TxContext

LOGGER = logging.getLogger("chains.ethereum")
CHAIN_NAME = "ETH"
Expand Down Expand Up @@ -161,7 +163,9 @@ async def get_logs(config, web3: Web3, contract, start_height):
raise


async def request_transactions(config, web3: Web3, contract, abi, start_height):
async def request_transactions(
config, web3: Web3, contract, abi, start_height
) -> AsyncIterator[Tuple[Dict, TxContext]]:
"""Continuously request data from the Ethereum blockchain.
TODO: support websocket API.
"""
Expand Down Expand Up @@ -191,14 +195,14 @@ async def request_transactions(config, web3: Web3, contract, abi, start_height):
message = event_data.args.message
try:
jdata = json.loads(message)
context = {
"chain_name": CHAIN_NAME,
"tx_hash": event_data.transactionHash.hex(),
"time": timestamp,
"height": event_data.blockNumber,
"publisher": publisher,
}
yield (jdata, context)
context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=event_data.transactionHash.hex(),
time=timestamp,
height=event_data.blockNumber,
publisher=publisher,
)
yield jdata, context

except json.JSONDecodeError:
# if it's not valid json, just ignore it...
Expand Down
26 changes: 14 additions & 12 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import struct
import time
from operator import itemgetter
from typing import AsyncIterator, Dict, Tuple

import aiohttp
from aiocache import cached, SimpleMemoryCache
Expand All @@ -24,16 +25,17 @@
get_chaindata,
incoming_chaindata,
)
from aleph.model.chains import Chain
from aleph.model.messages import Message
from aleph.model.pending import pending_messages_count, pending_txs_count
from aleph.register_chain import (
register_verifier,
register_incoming_worker,
register_outgoing_worker,
register_balance_getter,
)
from aleph.model.chains import Chain
from aleph.model.messages import Message
from aleph.model.pending import pending_messages_count, pending_txs_count
from aleph.utils import run_in_executor
from .tx_context import TxContext

LOGGER = logging.getLogger("chains.nuls2")
CHAIN_NAME = "NULS2"
Expand Down Expand Up @@ -154,7 +156,7 @@ async def get_transactions(
yield tx


async def request_transactions(config, session, start_height):
async def request_transactions(config, session, start_height) -> AsyncIterator[Tuple[Dict, TxContext]]:
"""Continuously request data from the NULS blockchain."""
target_addr = config.nuls2.sync_address.value
remark = config.nuls2.remark.value
Expand All @@ -171,14 +173,14 @@ async def request_transactions(config, session, start_height):
last_height = tx["height"]
jdata = json.loads(ddata)

context = {
"chain_name": CHAIN_NAME,
"tx_hash": tx["hash"],
"height": tx["height"],
"time": tx["createTime"],
"publisher": tx["coinFroms"][0]["address"],
}
yield (jdata, context)
context = TxContext(
chain_name=CHAIN_NAME,
tx_hash=tx["hash"],
height=tx["height"],
time=tx["createTime"],
publisher=tx["coinFroms"][0]["address"],
)
yield jdata, context

except json.JSONDecodeError:
# if it's not valid json, just ignore it...
Expand Down
11 changes: 11 additions & 0 deletions src/aleph/chains/tx_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass


@dataclass
class TxContext:
chain_name: str
tx_hash: str
height: int
# Transaction timestamp, in Unix time (number of seconds since epoch).
time: int
publisher: str
26 changes: 13 additions & 13 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from typing import List, Dict, Optional

import sentry_sdk
from pymongo import DeleteOne, InsertOne
from pymongo.errors import CursorNotFound
from setproctitle import setproctitle

from aleph.chains.common import get_chaindata_messages
from aleph.chains.tx_context import TxContext
from aleph.exceptions import InvalidMessageError
from aleph.logging import setup_logging
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import check_message
from aleph.services.p2p import singleton
from pymongo import DeleteOne, InsertOne
from pymongo.errors import CursorNotFound
from setproctitle import setproctitle

from .job_utils import prepare_loop

LOGGER = logging.getLogger("jobs.pending_txs")
Expand All @@ -25,17 +26,16 @@
async def handle_pending_tx(
pending, actions_list: List, seen_ids: Optional[List] = None
):
LOGGER.info(
"%s Handling TX in block %s"
% (pending["context"]["chain_name"], pending["context"]["height"])
)
tx_context = TxContext(**pending["context"])
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)

messages = await get_chaindata_messages(
pending["content"], pending["context"], seen_ids=seen_ids
pending["content"], tx_context, seen_ids=seen_ids
)
if messages:
message_actions = list()
for i, message in enumerate(messages):
message["time"] = pending["context"]["time"] + (i / 1000) # force order
message["time"] = tx_context.time + (i / 1000) # force order

try:
message = await check_message(
Expand All @@ -51,9 +51,9 @@ async def handle_pending_tx(
{
"message": message,
"source": dict(
chain_name=pending["context"]["chain_name"],
tx_hash=pending["context"]["tx_hash"],
height=pending["context"]["height"],
chain_name=tx_context.chain_name,
tx_hash=tx_context.tx_hash,
height=tx_context.height,
check_message=True, # should we store this?
),
}
Expand Down