Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aleph/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ async def get_chaindata_messages(
if config.ipfs.enabled.value:
# wait for 4 seconds to try to pin that
try:
LOGGER.info(f"chaindatax {chaindata}")
LOGGER.info(f"chaindata {chaindata}")
await PermanentPin.register(
multihash=chaindata["content"],
reason={
Expand Down
48 changes: 47 additions & 1 deletion src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import asyncio
from typing import Dict
from typing import Tuple
from typing import Iterable, Tuple

import aleph.config
from aleph.model import init_db_globals
from aleph.services.ipfs.common import init_ipfs_globals
from aleph.services.p2p import init_p2p_client
from configmanager import Config
from typing import Awaitable, Callable, List
from aleph.model.db_bulk_operation import DbBulkOperation
from itertools import groupby


def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
Expand All @@ -28,3 +31,46 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config
init_ipfs_globals(config)
_ = init_p2p_client(config)
return loop, config


async def perform_db_operations(db_operations: Iterable[DbBulkOperation]) -> None:
# Sort the operations by collection name before grouping and executing them.
sorted_operations = sorted(
db_operations,
key=lambda op: op.collection.__name__,
)

for collection, operations in groupby(sorted_operations, lambda op: op.collection):
mongo_ops = [op.operation for op in operations]
await collection.collection.bulk_write(mongo_ops)


async def gather_and_perform_db_operations(
tasks: List[Awaitable[List[DbBulkOperation]]],
on_error: Callable[[BaseException], None],
) -> None:
"""
Processes the result of the pending TX/message tasks.

Gathers the results of the tasks passed in input, handles exceptions
and performs DB operations.

:param tasks: Job tasks. Each of these tasks must return a list of
DbBulkOperation objects.
:param on_error: Error callback function. This function will be called
on each error from one of the tasks.
"""
task_results = await asyncio.gather(*tasks, return_exceptions=True)

errors = [op for op in task_results if isinstance(op, BaseException)]
for error in errors:
on_error(error)

db_operations = (
op
for operations in task_results
if not isinstance(operations, BaseException)
for op in operations
)

await perform_db_operations(db_operations)
35 changes: 9 additions & 26 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
"""

import asyncio
from itertools import groupby
from functools import partial
from logging import getLogger
from typing import List, Dict, Tuple

import sentry_sdk
from aleph_message.models import MessageType
from pymongo import DeleteOne, DeleteMany, ASCENDING
from setproctitle import setproctitle

from aleph.chains.common import incoming, IncomingStatus
from aleph.logging import setup_logging
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.model.pending import PendingMessage
from aleph.services.p2p import singleton
from aleph.types import ItemType
from pymongo import DeleteOne, DeleteMany, ASCENDING
from setproctitle import setproctitle
from aleph_message.models import MessageType

from .job_utils import prepare_loop
from .job_utils import prepare_loop, gather_and_perform_db_operations

LOGGER = getLogger("jobs.pending_messages")

Expand Down Expand Up @@ -47,27 +47,10 @@ async def handle_pending_message(


async def join_pending_message_tasks(tasks):
db_operations = await asyncio.gather(*tasks, return_exceptions=True)

errors = [op for op in db_operations if isinstance(op, BaseException)]
for error in errors:
LOGGER.error("Error while processing message: %s", error)

# Sort the operations by collection name before grouping and executing them.
db_operations = sorted(
(
op
for operations in db_operations
if not isinstance(operations, BaseException)
for op in operations
),
key=lambda op: op.collection.__name__,
await gather_and_perform_db_operations(
tasks,
on_error=partial(LOGGER.error, "Error while processing message: %s"),
)

for collection, operations in groupby(db_operations, lambda op: op.collection):
mongo_ops = [op.operation for op in operations]
await collection.collection.bulk_write(mongo_ops)

tasks.clear()


Expand Down
114 changes: 56 additions & 58 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
from aleph.model.pending import PendingMessage, PendingTX
from aleph.network import check_message
from aleph.services.p2p import singleton
from .job_utils import prepare_loop
from .job_utils import prepare_loop, gather_and_perform_db_operations
from aleph.model.db_bulk_operation import DbBulkOperation
from aleph.toolkit.batch import async_batch

LOGGER = logging.getLogger("jobs.pending_txs")


async def handle_pending_tx(
pending, actions_list: List, seen_ids: Optional[List] = None
):
tx_context = TxContext(**pending["context"])
pending_tx, seen_ids: Optional[List] = None
) -> List[DbBulkOperation]:

db_operations: List[DbBulkOperation] = []
tx_context = TxContext(**pending_tx["context"])
LOGGER.info("%s Handling TX in block %s", tx_context.chain_name, tx_context.height)

messages = await get_chaindata_messages(
pending["content"], tx_context, seen_ids=seen_ids
pending_tx["content"], tx_context, seen_ids=seen_ids
)
if messages:
message_actions = list()
for i, message in enumerate(messages):
message["time"] = tx_context.time + (i / 1000) # force order

Expand All @@ -46,76 +49,71 @@ async def handle_pending_tx(
continue

# we add it to the message queue... bad idea? should we process it asap?
message_actions.append(
InsertOne(
{
"message": message,
"source": dict(
chain_name=tx_context.chain_name,
tx_hash=tx_context.tx_hash,
height=tx_context.height,
check_message=True, # should we store this?
),
}
db_operations.append(
DbBulkOperation(
collection=PendingMessage,
operation=InsertOne(
{
"message": message,
"source": dict(
chain_name=tx_context.chain_name,
tx_hash=tx_context.tx_hash,
height=tx_context.height,
check_message=True, # should we store this?
),
}
),
)
)
await asyncio.sleep(0)

if message_actions:
await PendingMessage.collection.bulk_write(message_actions)
else:
LOGGER.debug("TX contains no message")

if messages is not None:
# bogus or handled, we remove it.
actions_list.append(DeleteOne({"_id": pending["_id"]}))
db_operations.append(
DbBulkOperation(
collection=PendingTX, operation=DeleteOne({"_id": pending_tx["_id"]})
)
)

return db_operations

async def join_pending_txs_tasks(tasks, actions_list):
results = await asyncio.gather(*tasks, return_exceptions=True)

for result in results:
if isinstance(result, BaseException):
LOGGER.exception(
"error in incoming txs task",
exc_info=(type(result), result, result.__traceback__),
)
async def join_pending_txs_tasks(tasks):
await gather_and_perform_db_operations(
tasks,
on_error=lambda e: LOGGER.exception(
"error in incoming txs task",
exc_info=(type(e), e, e.__traceback__),
),
)

tasks.clear()

if len(actions_list):
await PendingTX.collection.bulk_write(actions_list)
actions_list.clear()
async def process_pending_txs():
"""
Process chain transactions in the Pending TX queue.
"""

batch_size = 200

async def process_pending_txs():
"""Each few minutes, try to handle message that were added to the
pending queue (Unavailable messages)."""
if not await PendingTX.collection.count_documents({}):
await asyncio.sleep(5)
return

actions = []
tasks = []
seen_offchain_hashes = []
seen_offchain_hashes = set()
seen_ids = []
i = 0
LOGGER.info("handling TXs")
async for pending in PendingTX.collection.find().sort([("context.time", 1)]):
if pending["content"]["protocol"] == "aleph-offchain":
if pending["content"]["content"] not in seen_offchain_hashes:
seen_offchain_hashes.append(pending["content"]["content"])
else:
continue

i += 1
tasks.append(handle_pending_tx(pending, actions, seen_ids=seen_ids))

if i > 200:
await join_pending_txs_tasks(tasks, actions)
i = 0

await join_pending_txs_tasks(tasks, actions)
async for pending_tx_batch in async_batch(
PendingTX.collection.find().sort([("context.time", 1)]), batch_size
):
tasks = []
for pending_tx in pending_tx_batch:
if pending_tx["content"]["protocol"] == "aleph-offchain":
if pending_tx["content"]["content"] in seen_offchain_hashes:
continue

seen_offchain_hashes.add(pending_tx["content"]["content"])
tasks.append(handle_pending_tx(pending_tx, seen_ids=seen_ids))

await join_pending_txs_tasks(tasks)


async def handle_txs_task():
Expand Down
Empty file added src/aleph/toolkit/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions src/aleph/toolkit/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import AsyncIterator, List, TypeVar

T = TypeVar("T")


async def async_batch(
async_iterable: AsyncIterator[T], n: int
) -> AsyncIterator[List[T]]:
batch = []
async for item in async_iterable:
batch.append(item)
if len(batch) == n:
yield batch
batch = []

# Yield the last batch
if batch:
yield batch
Empty file.
8 changes: 8 additions & 0 deletions tests/message_processing/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest

from .load_fixtures import load_fixture_messages


@pytest.fixture
def fixture_messages():
return load_fixture_messages("test-data-pending-tx-messages.json")
11 changes: 11 additions & 0 deletions tests/message_processing/load_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
import os
from typing import Dict, List
from pathlib import Path


def load_fixture_messages(fixture: str) -> List[Dict]:
fixture_path = Path(__file__).parent / "fixtures" / fixture

with open(fixture_path) as f:
return json.load(f)["content"]["messages"]
Loading