diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 6d33ebc8c..8e20434fb 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -26,7 +26,8 @@ from aleph.cli.args import parse_args from aleph.config import get_defaults from aleph.exceptions import InvalidConfigException, KeyNotFoundException -from aleph.jobs import start_jobs, prepare_loop +from aleph.jobs.job_utils import prepare_loop +from aleph.jobs import start_jobs from aleph.logging import setup_logging from aleph.network import listener_tasks from aleph.services import p2p diff --git a/src/aleph/jobs.py b/src/aleph/jobs.py deleted file mode 100644 index f1027e47e..000000000 --- a/src/aleph/jobs.py +++ /dev/null @@ -1,426 +0,0 @@ -import asyncio -from itertools import groupby -from logging import getLogger -from multiprocessing import Process -from typing import Coroutine, List, Dict, Optional, Tuple - -import aioipfs -import sentry_sdk -from aleph.chains.common import incoming, get_chaindata_messages, IncomingStatus -from aleph.exceptions import InvalidMessageError -from aleph.logging import setup_logging -from aleph.model.db_bulk_operation import DbBulkOperation -from aleph.model.p2p import get_peers -from aleph.model.pending import PendingMessage, PendingTX -from aleph.network import check_message -from aleph.services.ipfs.common import connect_ipfs_peer -from aleph.services.p2p import singleton -from aleph.types import ItemType -from pymongo import DeleteOne, InsertOne, DeleteMany, ASCENDING -from pymongo.errors import CursorNotFound -from setproctitle import setproctitle - -LOGGER = getLogger("JOBS") - - -async def handle_pending_message( - pending: Dict, - seen_ids: Dict[Tuple, int], -) -> List[DbBulkOperation]: - status, operations = await incoming( - pending["message"], - chain_name=pending["source"].get("chain_name"), - tx_hash=pending["source"].get("tx_hash"), - height=pending["source"].get("height"), - seen_ids=seen_ids, - check_message=pending["source"].get("check_message", True), - retrying=True, - existing_id=pending["_id"], - ) - - if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) - - return operations - - -async def join_pending_message_tasks(tasks): - db_operations = await asyncio.gather(*tasks, return_exceptions=True) - - errors = [op for op in db_operations if isinstance(op, BaseException)] - for error in errors: - LOGGER.error("Error while processing message: %s", error) - - # Sort the operations by collection name before grouping and executing them. - db_operations = sorted( - ( - op - for operations in db_operations - if not isinstance(operations, BaseException) - for op in operations - ), - key=lambda op: op.collection.__name__, - ) - - for collection, operations in groupby(db_operations, lambda op: op.collection): - mongo_ops = [op.operation for op in operations] - await collection.collection.bulk_write(mongo_ops) - - tasks.clear() - - -async def retry_messages_job(shared_stats: Dict): - """Each few minutes, try to handle message that were added to the - pending queue (Unavailable messages).""" - - seen_ids: Dict[Tuple, int] = dict() - gtasks: List[asyncio.Task] = [] - tasks: List[asyncio.Task] = [] - i: int = 0 - j: int = 0 - find_params: Dict = {} - - while await PendingMessage.collection.count_documents(find_params): - async for pending in PendingMessage.collection.find(find_params).sort( - [("retries", ASCENDING), ("message.time", ASCENDING)] - ).batch_size(256): - LOGGER.debug( - f"retry_message_job len_seen_ids={len(seen_ids)} " - f"len_gtasks={len(gtasks)} len_tasks={len(tasks)}" - ) - - shared_stats["retry_messages_job_seen_ids"] = len(seen_ids) - shared_stats["retry_messages_job_gtasks"] = len(gtasks) - shared_stats["retry_messages_job_tasks"] = len(tasks) - shared_stats["retry_messages_job_i"] = i - shared_stats["retry_messages_job_j"] = j - - if pending.get("message") is None: - LOGGER.warning( - "Found PendingMessage with empty message, this should be caught before insertion" - ) - await PendingMessage.collection.delete_one({"_id": pending["_id"]}) - continue - - if not isinstance(pending["message"], dict): - raise ValueError( - "Pending message is not a dictionary and cannot be read." - ) - - if ( - pending["message"]["item_type"] == ItemType.IPFS - or pending["message"]["type"] == "STORE" - ): - i += 15 - j += 100 - else: - i += 1 - j += 1 - - tasks.append(asyncio.create_task(handle_pending_message(pending, seen_ids))) - - if j >= 20000: - # Group tasks using asyncio.gather in `gtasks`. - gtasks.append( - asyncio.create_task( - join_pending_message_tasks( - tasks, - ) - ) - ) - tasks = [] - i = 0 - j = 0 - - if i >= 1024: - await join_pending_message_tasks(tasks) - tasks = [] - i = 0 - - gtasks.append(asyncio.create_task(join_pending_message_tasks(tasks))) - - await asyncio.gather(*gtasks, return_exceptions=True) - gtasks = [] - - if await PendingMessage.collection.count_documents(find_params) > 100000: - LOGGER.info("Cleaning messages") - clean_actions = [] - # big collection, try to remove dups. - for key, height in seen_ids.items(): - clean_actions.append( - DeleteMany( - { - "message.item_hash": key[0], - "message.sender": key[1], - "source.chain_name": key[2], - "source.height": {"$gt": height}, - } - ) - ) - result = await PendingMessage.collection.bulk_write(clean_actions) - LOGGER.info(repr(result)) - - await asyncio.sleep(5) - - -async def retry_messages_task(shared_stats: Dict): - """Handle message that were added to the pending queue""" - await asyncio.sleep(4) - while True: - try: - await retry_messages_job(shared_stats=shared_stats) - except Exception: - LOGGER.exception("Error in pending messages retry job") - - LOGGER.debug("Waiting 5 seconds for new pending messages...") - await asyncio.sleep(5) - - -async def handle_pending_tx( - pending, actions_list: List, seen_ids: Optional[List] = None -): - LOGGER.info( - "%s Handling TX in block %s" - % (pending["context"]["chain_name"], pending["context"]["height"]) - ) - messages = await get_chaindata_messages( - pending["content"], pending["context"], seen_ids=seen_ids - ) - if isinstance(messages, list): - message_actions = list() - for i, message in enumerate(messages): - message["time"] = pending["context"]["time"] + (i / 1000) # force order - - try: - message = await check_message( - message, trusted=True - ) # we don't check signatures yet. - except InvalidMessageError as error: - LOGGER.warning(error) - continue - - # we add it to the message queue... bad idea? should we process it asap? - message_actions.append( - InsertOne( - { - "message": message, - "source": dict( - chain_name=pending["context"]["chain_name"], - tx_hash=pending["context"]["tx_hash"], - height=pending["context"]["height"], - check_message=True, # should we store this? - ), - } - ) - ) - await asyncio.sleep(0) - - if message_actions: - await PendingMessage.collection.bulk_write(message_actions) - else: - LOGGER.debug("TX contains no message") - - if messages is not None: - # bogus or handled, we remove it. - actions_list.append(DeleteOne({"_id": pending["_id"]})) - - -async def join_pending_txs_tasks(tasks, actions_list): - results = await asyncio.gather(*tasks, return_exceptions=True) - - for result in results: - if isinstance(result, BaseException): - LOGGER.exception( - "error in incoming txs task", - exc_info=(type(result), result, result.__traceback__), - ) - - tasks.clear() - - if len(actions_list): - await PendingTX.collection.bulk_write(actions_list) - actions_list.clear() - - -async def handle_txs_job(): - """Each few minutes, try to handle message that were added to the - pending queue (Unavailable messages).""" - if not await PendingTX.collection.count_documents({}): - await asyncio.sleep(5) - return - - actions = [] - tasks = [] - seen_offchain_hashes = [] - seen_ids = [] - i = 0 - LOGGER.info("handling TXs") - async for pending in PendingTX.collection.find().sort([("context.time", 1)]): - if pending["content"]["protocol"] == "aleph-offchain": - if pending["content"]["content"] not in seen_offchain_hashes: - seen_offchain_hashes.append(pending["content"]["content"]) - else: - continue - - i += 1 - tasks.append(handle_pending_tx(pending, actions, seen_ids=seen_ids)) - - if i > 200: - await join_pending_txs_tasks(tasks, actions) - i = 0 - - await join_pending_txs_tasks(tasks, actions) - - -async def handle_txs_task(): - await asyncio.sleep(4) - while True: - try: - await handle_txs_job() - await asyncio.sleep(5) - except CursorNotFound: - LOGGER.exception("Cursor error in pending txs job ") - except Exception: - LOGGER.exception("Error in pending txs job") - - await asyncio.sleep(0.01) - - -def prepare_loop(config_values: Dict) -> asyncio.AbstractEventLoop: - from aleph.model import init_db - from aleph.web import app - from configmanager import Config - from aleph.config import get_defaults - from aleph.services.ipfs.common import get_ipfs_api - from aleph.services.p2p import http, init_p2p_client - - http.SESSION = None # type:ignore - - loop = asyncio.get_event_loop() - - config = Config(schema=get_defaults()) - app["config"] = config - config.load_values(config_values) - - init_db(config, ensure_indexes=False) - loop.run_until_complete(get_ipfs_api(timeout=2, reset=True)) - _ = init_p2p_client(config) - return loop - - -def txs_task_loop(config_values: Dict, api_servers: List): - setproctitle("aleph.jobs.txs_task_loop") - sentry_sdk.init( - dsn=config_values["sentry"]["dsn"], - traces_sample_rate=config_values["sentry"]["traces_sample_rate"], - ignore_errors=[KeyboardInterrupt], - ) - setup_logging( - loglevel=config_values["logging"]["level"], - filename="/tmp/txs_task_loop.log", - ) - singleton.api_servers = api_servers - - loop = prepare_loop(config_values) - loop.run_until_complete(handle_txs_task()) - - -def messages_task_loop(config_values: Dict, shared_stats: Dict, api_servers: List): - """ - Background task that processes all the messages received by the node. - - :param config_values: Application configuration, as a dictionary. - :param shared_stats: Dictionary of application metrics. This dictionary is updated by othe - processes and must be allocated from shared memory. - :param api_servers: List of Core Channel Nodes with an HTTP interface found on the network. - This list is updated by other processes and must be allocated from - shared memory by the caller. - """ - - setproctitle("aleph.jobs.messages_task_loop") - sentry_sdk.init( - dsn=config_values["sentry"]["dsn"], - traces_sample_rate=config_values["sentry"]["traces_sample_rate"], - ignore_errors=[KeyboardInterrupt], - ) - setup_logging( - loglevel=config_values["logging"]["level"], - filename="/tmp/messages_task_loop.log", - ) - singleton.api_servers = api_servers - - loop = prepare_loop(config_values) - loop.run_until_complete(asyncio.gather(retry_messages_task(shared_stats))) - - -async def reconnect_ipfs_job(config): - from aleph.services.utils import get_IP - - my_ip = await get_IP() - await asyncio.sleep(2) - while True: - try: - LOGGER.info("Reconnecting to peers") - for peer in config.ipfs.peers.value: - try: - ret = await connect_ipfs_peer(peer) - if "Strings" in ret: - LOGGER.info("\n".join(ret["Strings"])) - except aioipfs.APIError: - LOGGER.warning("Can't reconnect to %s" % peer) - - async for peer in get_peers(peer_type="IPFS"): - if peer in config.ipfs.peers.value: - continue - - if my_ip in peer: - continue - - try: - ret = await connect_ipfs_peer(peer) - if ret and "Strings" in ret: - LOGGER.info("\n".join(ret["Strings"])) - except aioipfs.APIError: - LOGGER.warning("Can't reconnect to %s" % peer) - - except Exception: - LOGGER.exception("Error reconnecting to peers") - - await asyncio.sleep(config.ipfs.reconnect_delay.value) - - -def start_jobs( - config, - shared_stats: Dict, - api_servers: List[str], - use_processes=True, -) -> List[Coroutine]: - LOGGER.info("starting jobs") - tasks: List[Coroutine] = [] - - if use_processes: - config_values = config.dump_values() - p1 = Process( - target=messages_task_loop, - args=( - config_values, - shared_stats, - api_servers, - ), - ) - p2 = Process( - target=txs_task_loop, - args=(config_values, api_servers), - ) - p1.start() - p2.start() - else: - tasks.append(retry_messages_task(shared_stats=shared_stats)) - tasks.append(handle_txs_task()) - - if config.ipfs.enabled.value: - tasks.append(reconnect_ipfs_job(config)) - - return tasks diff --git a/src/aleph/jobs/__init__.py b/src/aleph/jobs/__init__.py new file mode 100644 index 000000000..5a455ff5e --- /dev/null +++ b/src/aleph/jobs/__init__.py @@ -0,0 +1,44 @@ +import logging +from multiprocessing import Process +from typing import Dict, List, Coroutine + +from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task +from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task +from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job + +LOGGER = logging.getLogger("jobs") + + +def start_jobs( + config, + shared_stats: Dict, + api_servers: List[str], + use_processes=True, +) -> List[Coroutine]: + LOGGER.info("starting jobs") + tasks: List[Coroutine] = [] + + if use_processes: + config_values = config.dump_values() + p1 = Process( + target=pending_messages_subprocess, + args=( + config_values, + shared_stats, + api_servers, + ), + ) + p2 = Process( + target=pending_txs_subprocess, + args=(config_values, api_servers), + ) + p1.start() + p2.start() + else: + tasks.append(retry_messages_task(shared_stats=shared_stats)) + tasks.append(handle_txs_task()) + + if config.ipfs.enabled.value: + tasks.append(reconnect_ipfs_job(config)) + + return tasks diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py new file mode 100644 index 000000000..d8fdd4041 --- /dev/null +++ b/src/aleph/jobs/job_utils.py @@ -0,0 +1,24 @@ +import asyncio +from typing import Dict + + +def prepare_loop(config_values: Dict) -> asyncio.AbstractEventLoop: + from aleph.model import init_db + from aleph.web import app + from configmanager import Config + from aleph.config import get_defaults + from aleph.services.ipfs.common import get_ipfs_api + from aleph.services.p2p import http, init_p2p_client + + http.SESSION = None # type:ignore + + loop = asyncio.get_event_loop() + + config = Config(schema=get_defaults()) + app["config"] = config + config.load_values(config_values) + + init_db(config, ensure_indexes=False) + loop.run_until_complete(get_ipfs_api(timeout=2, reset=True)) + _ = init_p2p_client(config) + return loop diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py new file mode 100644 index 000000000..5a76b061b --- /dev/null +++ b/src/aleph/jobs/process_pending_messages.py @@ -0,0 +1,208 @@ +""" +Job in charge of (re-) processing Aleph messages waiting in the pending queue. +""" + +import asyncio +from itertools import groupby +from logging import getLogger +from typing import List, Dict, Tuple + +import sentry_sdk +from aleph.chains.common import incoming, IncomingStatus +from aleph.logging import setup_logging +from aleph.model.db_bulk_operation import DbBulkOperation +from aleph.model.pending import PendingMessage +from aleph.services.p2p import singleton +from aleph.types import ItemType +from pymongo import DeleteOne, DeleteMany, ASCENDING +from setproctitle import setproctitle + +from .job_utils import prepare_loop + +LOGGER = getLogger("jobs.pending_messages") + + +async def handle_pending_message( + pending: Dict, + seen_ids: Dict[Tuple, int], +) -> List[DbBulkOperation]: + status, operations = await incoming( + pending["message"], + chain_name=pending["source"].get("chain_name"), + tx_hash=pending["source"].get("tx_hash"), + height=pending["source"].get("height"), + seen_ids=seen_ids, + check_message=pending["source"].get("check_message", True), + retrying=True, + existing_id=pending["_id"], + ) + + if status != IncomingStatus.RETRYING_LATER: + operations.append( + DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) + ) + + return operations + + +async def join_pending_message_tasks(tasks): + db_operations = await asyncio.gather(*tasks, return_exceptions=True) + + errors = [op for op in db_operations if isinstance(op, BaseException)] + for error in errors: + LOGGER.error("Error while processing message: %s", error) + + # Sort the operations by collection name before grouping and executing them. + db_operations = sorted( + ( + op + for operations in db_operations + if not isinstance(operations, BaseException) + for op in operations + ), + key=lambda op: op.collection.__name__, + ) + + for collection, operations in groupby(db_operations, lambda op: op.collection): + mongo_ops = [op.operation for op in operations] + await collection.bulk_write(mongo_ops) + + tasks.clear() + + +async def process_pending_messages(shared_stats: Dict): + """ + Processes all the messages in the pending message queue. + """ + + seen_ids: Dict[Tuple, int] = dict() + gtasks: List[asyncio.Task] = [] + tasks: List[asyncio.Task] = [] + i: int = 0 + j: int = 0 + find_params: Dict = {} + + while await PendingMessage.collection.count_documents(find_params): + async for pending in PendingMessage.collection.find(find_params).sort( + [("retries", ASCENDING), ("message.time", ASCENDING)] + ).batch_size(256): + LOGGER.debug( + f"retry_message_job len_seen_ids={len(seen_ids)} " + f"len_gtasks={len(gtasks)} len_tasks={len(tasks)}" + ) + + shared_stats["retry_messages_job_seen_ids"] = len(seen_ids) + shared_stats["retry_messages_job_gtasks"] = len(gtasks) + shared_stats["retry_messages_job_tasks"] = len(tasks) + shared_stats["retry_messages_job_i"] = i + shared_stats["retry_messages_job_j"] = j + + if pending.get("message") is None: + LOGGER.warning( + "Found PendingMessage with empty message, this should be caught before insertion" + ) + await PendingMessage.collection.delete_one({"_id": pending["_id"]}) + continue + + if not isinstance(pending["message"], dict): + raise ValueError( + "Pending message is not a dictionary and cannot be read." + ) + + if ( + pending["message"]["item_type"] == ItemType.IPFS + or pending["message"]["type"] == "STORE" + ): + i += 15 + j += 100 + else: + i += 1 + j += 1 + + tasks.append(asyncio.create_task(handle_pending_message(pending, seen_ids))) + + if j >= 20000: + # Group tasks using asyncio.gather in `gtasks`. + gtasks.append( + asyncio.create_task( + join_pending_message_tasks( + tasks, + ) + ) + ) + tasks = [] + i = 0 + j = 0 + + if i >= 1024: + await join_pending_message_tasks(tasks) + tasks = [] + i = 0 + + gtasks.append(asyncio.create_task(join_pending_message_tasks(tasks))) + + await asyncio.gather(*gtasks, return_exceptions=True) + gtasks = [] + + if await PendingMessage.collection.count_documents(find_params) > 100000: + LOGGER.info("Cleaning messages") + clean_actions = [] + # big collection, try to remove dups. + for key, height in seen_ids.items(): + clean_actions.append( + DeleteMany( + { + "message.item_hash": key[0], + "message.sender": key[1], + "source.chain_name": key[2], + "source.height": {"$gt": height}, + } + ) + ) + result = await PendingMessage.collection.bulk_write(clean_actions) + LOGGER.info(repr(result)) + + +async def retry_messages_task(shared_stats: Dict): + """Handle message that were added to the pending queue""" + await asyncio.sleep(4) + while True: + try: + await process_pending_messages(shared_stats=shared_stats) + await asyncio.sleep(5) + + except Exception: + LOGGER.exception("Error in pending messages retry job") + + LOGGER.debug("Waiting 5 seconds for new pending messages...") + await asyncio.sleep(5) + + +def pending_messages_subprocess( + config_values: Dict, shared_stats: Dict, api_servers: List +): + """ + Background task that processes all the messages received by the node. + + :param config_values: Application configuration, as a dictionary. + :param shared_stats: Dictionary of application metrics. This dictionary is updated by othe + processes and must be allocated from shared memory. + :param api_servers: List of Core Channel Nodes with an HTTP interface found on the network. + This list is updated by other processes and must be allocated from + shared memory by the caller. + """ + + setproctitle("aleph.jobs.messages_task_loop") + sentry_sdk.init( + dsn=config_values["sentry"]["dsn"], + traces_sample_rate=config_values["sentry"]["traces_sample_rate"], + ignore_errors=[KeyboardInterrupt], + ) + setup_logging( + loglevel=config_values["logging"]["level"], + filename="/tmp/messages_task_loop.log", + ) + singleton.api_servers = api_servers + + loop = prepare_loop(config_values) + loop.run_until_complete(asyncio.gather(retry_messages_task(shared_stats))) diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py new file mode 100644 index 000000000..7aab4e6d6 --- /dev/null +++ b/src/aleph/jobs/process_pending_txs.py @@ -0,0 +1,149 @@ +""" +Job in charge of loading messages stored on-chain and put them in the pending message queue. +""" + +import asyncio +import logging +from typing import List, Dict, Optional + +import sentry_sdk +from aleph.chains.common import get_chaindata_messages +from aleph.exceptions import InvalidMessageError +from aleph.logging import setup_logging +from aleph.model.pending import PendingMessage, PendingTX +from aleph.network import check_message +from aleph.services.p2p import singleton +from pymongo import DeleteOne, InsertOne +from pymongo.errors import CursorNotFound +from setproctitle import setproctitle + +from .job_utils import prepare_loop + +LOGGER = logging.getLogger("jobs.pending_txs") + + +async def handle_pending_tx( + pending, actions_list: List, seen_ids: Optional[List] = None +): + LOGGER.info( + "%s Handling TX in block %s" + % (pending["context"]["chain_name"], pending["context"]["height"]) + ) + messages = await get_chaindata_messages( + pending["content"], pending["context"], seen_ids=seen_ids + ) + if isinstance(messages, list): + message_actions = list() + for i, message in enumerate(messages): + message["time"] = pending["context"]["time"] + (i / 1000) # force order + + try: + message = await check_message( + message, trusted=True + ) # we don't check signatures yet. + except InvalidMessageError as error: + LOGGER.warning(error) + continue + + # we add it to the message queue... bad idea? should we process it asap? + message_actions.append( + InsertOne( + { + "message": message, + "source": dict( + chain_name=pending["context"]["chain_name"], + tx_hash=pending["context"]["tx_hash"], + height=pending["context"]["height"], + check_message=True, # should we store this? + ), + } + ) + ) + await asyncio.sleep(0) + + if message_actions: + await PendingMessage.collection.bulk_write(message_actions) + else: + LOGGER.debug("TX contains no message") + + if messages is not None: + # bogus or handled, we remove it. + actions_list.append(DeleteOne({"_id": pending["_id"]})) + + +async def join_pending_txs_tasks(tasks, actions_list): + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if isinstance(result, BaseException): + LOGGER.exception( + "error in incoming txs task", + exc_info=(type(result), result, result.__traceback__), + ) + + tasks.clear() + + if len(actions_list): + await PendingTX.collection.bulk_write(actions_list) + actions_list.clear() + + +async def process_pending_txs(): + """Each few minutes, try to handle message that were added to the + pending queue (Unavailable messages).""" + if not await PendingTX.collection.count_documents({}): + await asyncio.sleep(5) + return + + actions = [] + tasks = [] + seen_offchain_hashes = [] + seen_ids = [] + i = 0 + LOGGER.info("handling TXs") + async for pending in PendingTX.collection.find().sort([("context.time", 1)]): + if pending["content"]["protocol"] == "aleph-offchain": + if pending["content"]["content"] not in seen_offchain_hashes: + seen_offchain_hashes.append(pending["content"]["content"]) + else: + continue + + i += 1 + tasks.append(handle_pending_tx(pending, actions, seen_ids=seen_ids)) + + if i > 200: + await join_pending_txs_tasks(tasks, actions) + i = 0 + + await join_pending_txs_tasks(tasks, actions) + + +async def handle_txs_task(): + await asyncio.sleep(4) + while True: + try: + await process_pending_txs() + await asyncio.sleep(5) + except CursorNotFound: + LOGGER.exception("Cursor error in pending txs job ") + except Exception: + LOGGER.exception("Error in pending txs job") + + await asyncio.sleep(0.01) + + +def pending_txs_subprocess(config_values: Dict, api_servers: List): + setproctitle("aleph.jobs.txs_task_loop") + sentry_sdk.init( + dsn=config_values["sentry"]["dsn"], + traces_sample_rate=config_values["sentry"]["traces_sample_rate"], + ignore_errors=[KeyboardInterrupt], + ) + setup_logging( + loglevel=config_values["logging"]["level"], + filename="/tmp/txs_task_loop.log", + ) + singleton.api_servers = api_servers + + loop = prepare_loop(config_values) + loop.run_until_complete(handle_txs_task()) diff --git a/src/aleph/jobs/reconnect_ipfs.py b/src/aleph/jobs/reconnect_ipfs.py new file mode 100644 index 000000000..259484ad8 --- /dev/null +++ b/src/aleph/jobs/reconnect_ipfs.py @@ -0,0 +1,48 @@ +""" +Job in charge of reconnecting to IPFS peers periodically. +""" + +import asyncio +import logging + +import aioipfs +from aleph.model.p2p import get_peers +from aleph.services.ipfs.common import connect_ipfs_peer + +LOGGER = logging.getLogger("jobs.reconnect_ipfs") + + +async def reconnect_ipfs_job(config): + from aleph.services.utils import get_IP + + my_ip = await get_IP() + await asyncio.sleep(2) + while True: + try: + LOGGER.info("Reconnecting to peers") + for peer in config.ipfs.peers.value: + try: + ret = await connect_ipfs_peer(peer) + if "Strings" in ret: + LOGGER.info("\n".join(ret["Strings"])) + except aioipfs.APIError: + LOGGER.warning("Can't reconnect to %s" % peer) + + async for peer in get_peers(peer_type="IPFS"): + if peer in config.ipfs.peers.value: + continue + + if my_ip in peer: + continue + + try: + ret = await connect_ipfs_peer(peer) + if ret and "Strings" in ret: + LOGGER.info("\n".join(ret["Strings"])) + except aioipfs.APIError: + LOGGER.warning("Can't reconnect to %s" % peer) + + except Exception: + LOGGER.exception("Error reconnecting to peers") + + await asyncio.sleep(config.ipfs.reconnect_delay.value)