Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/aleph/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from enum import IntEnum
from typing import Dict, Optional, Tuple, List

from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne

from aleph.config import get_config
from aleph.exceptions import (
AlephStorageException,
InvalidContent,
ContentCurrentlyUnavailable, UnknownHashError,
)
from aleph.handlers.forget import handle_forget_message
from aleph.handlers.storage import handle_new_storage
from aleph.model.db_bulk_operation import DbBulkOperation
Expand All @@ -17,12 +19,9 @@
from aleph.network import check_message as check_message_fn
from aleph.permissions import check_sender_authorization
from aleph.storage import get_json, pin_hash, add_json, get_message_content
from aleph.web import app
from aleph.exceptions import (
AlephStorageException,
InvalidContent,
ContentCurrentlyUnavailable, UnknownHashError,
)
from aleph_message.models import MessageType
from bson import ObjectId
from pymongo import UpdateOne

LOGGER = logging.getLogger("chains.common")

Expand Down Expand Up @@ -347,6 +346,7 @@ async def get_chaindata(messages, bulk_threshold=2000):
async def get_chaindata_messages(
chaindata: Dict, context, seen_ids: Optional[List] = None
):
config = get_config()

protocol = chaindata.get("protocol", None)
version = chaindata.get("version", None)
Expand Down Expand Up @@ -384,7 +384,7 @@ async def get_chaindata_messages(
raise

LOGGER.info("Got bulk data with %d items" % len(messages))
if app["config"].ipfs.enabled.value:
if config.ipfs.enabled.value:
# wait for 4 seconds to try to pin that
try:
LOGGER.info(f"chaindatax {chaindata}")
Expand Down
2 changes: 0 additions & 2 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@

async def verify_signature(message):
"""Verifies a signature of a message, return True if verified, false if not"""
from aleph.web import app

config = app["config"]
# w3 = await loop.run_in_executor(None, get_web3, config)

verification = await get_verification_buffer(message)
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ async def nuls2_outgoing_worker(config):
async def nuls2_balance_getter(address, config=None):
global DECIMALS
if config is None:
from aleph.web import app
from aleph.config import get_config
config = get_config()

config = app["config"]
server = get_server(config.nuls2.api_url.value)
contract_address = get_server(config.nuls2.contract_address.value)
chain_id = config.nuls2.chain_id.value
Expand Down
46 changes: 23 additions & 23 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from aleph import model
from aleph.chains import connector_tasks
from aleph.cli.args import parse_args
from aleph.config import get_defaults
import aleph.config
from aleph.exceptions import InvalidConfigException, KeyNotFoundException
from aleph.jobs.job_utils import prepare_loop
from aleph.jobs import start_jobs
Expand All @@ -42,7 +42,9 @@
LOGGER = logging.getLogger(__name__)


async def run_server(host: str, port: int, shared_stats: dict, extra_web_config: dict):
async def run_server(
config: Config, host: str, port: int, shared_stats: dict, extra_web_config: dict
):
# These imports will run in different processes
from aiohttp import web
from aleph.web.controllers.listener import broadcast
Expand All @@ -52,6 +54,7 @@ async def run_server(host: str, port: int, shared_stats: dict, extra_web_config:

LOGGER.debug("Setup of runner")

app["config"] = config
app["extra_config"] = extra_web_config
app["shared_stats"] = shared_stats

Expand Down Expand Up @@ -81,30 +84,33 @@ def run_server_coroutine(
Used as target of multiprocessing.Process.
"""
setproctitle(f"pyaleph-run_server_coroutine-{port}")

loop, config = prepare_loop(config_values)

extra_web_config = extra_web_config or {}
setup_logging(
loglevel=config_values["logging"]["level"],
loglevel=config.logging.level.value,
filename=f"/tmp/run_server_coroutine-{port}.log",
)
if enable_sentry:
sentry_sdk.init(
dsn=config_values["sentry"]["dsn"],
traces_sample_rate=config_values["sentry"]["traces_sample_rate"],
dsn=config.sentry.dsn.value,
traces_sample_rate=config.sentry.traces_sample_rate.value,
ignore_errors=[KeyboardInterrupt],
)

# Use a try-catch-capture_exception to work with multiprocessing, see
# https://github.com/getsentry/raven-python/issues/1110
try:
loop = prepare_loop(config_values)
loop.run_until_complete(run_server(host, port, shared_stats, extra_web_config))
loop.run_until_complete(run_server(config, host, port, shared_stats, extra_web_config))
except Exception as e:
if enable_sentry:
sentry_sdk.capture_exception(e)
sentry_sdk.flush()
raise


def main(args):
async def main(args):
"""Main entry point allowing external calls

Args:
Expand All @@ -125,15 +131,14 @@ def main(args):
return

LOGGER.info("Loading configuration")
config = Config(schema=get_defaults())
config = aleph.config.app_config

if args.config_file is not None:
LOGGER.debug("Loading config file '%s'", args.config_file)
config.yaml.load(args.config_file)

# CLI config values override config file values
config.logging.level.value = args.loglevel
app["config"] = config

# Check for invalid/deprecated config
if "protocol" in config.p2p.clients.value:
Expand All @@ -155,10 +160,10 @@ def main(args):

if args.sentry_disabled:
LOGGER.info("Sentry disabled by CLI arguments")
elif app["config"].sentry.dsn.value:
elif config.sentry.dsn.value:
sentry_sdk.init(
dsn=app["config"].sentry.dsn.value,
traces_sample_rate=app["config"].sentry.traces_sample_rate.value,
dsn=config.sentry.dsn.value,
traces_sample_rate=config.sentry.traces_sample_rate.value,
ignore_errors=[KeyboardInterrupt],
)
LOGGER.info("Sentry enabled")
Expand All @@ -169,8 +174,6 @@ def main(args):
model.init_db(config, ensure_indexes=True)
LOGGER.info("Database initialized.")

# filestore.init_store(config)
# LOGGER.info("File store initalized.")
init_cors() # FIXME: This is stateful and process-dependent
set_start_method("spawn")

Expand All @@ -191,12 +194,9 @@ def main(args):
use_processes=True,
)

loop = asyncio.get_event_loop()

# handler = app.make_handler(loop=loop)
LOGGER.debug("Initializing p2p")
p2p_init_task = p2p.init_p2p(config, api_servers)
p2p_client, p2p_tasks = loop.run_until_complete(p2p_init_task)
p2p_client, p2p_tasks = await p2p.init_p2p(config, api_servers)
tasks += p2p_tasks
LOGGER.debug("Initialized p2p")

Expand All @@ -217,7 +217,7 @@ def main(args):
config.aleph.host.value,
config.p2p.http_port.value,
shared_stats,
args.sentry_disabled is False and app["config"].sentry.dsn.value,
args.sentry_disabled is False and config.sentry.dsn.value,
extra_web_config,
),
)
Expand All @@ -228,7 +228,7 @@ def main(args):
config.aleph.host.value,
config.aleph.port.value,
shared_stats,
args.sentry_disabled is False and app["config"].sentry.dsn.value,
args.sentry_disabled is False and config.sentry.dsn.value,
extra_web_config,
),
)
Expand All @@ -248,13 +248,13 @@ def main(args):
# srv = loop.run_until_complete(f)
# LOGGER.info('Serving on %s', srv.sockets[0].getsockname())
LOGGER.debug("Running event loop")
loop.run_until_complete(asyncio.gather(*tasks))
await asyncio.gather(*tasks)


def run():
"""Entry point for console_scripts"""
try:
main(sys.argv[1:])
asyncio.run(main(sys.argv[1:]))
except (KeyNotFoundException, InvalidConfigException):
sys.exit(1)

Expand Down
9 changes: 9 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from configmanager import Config


def get_defaults():
return {
Expand Down Expand Up @@ -85,3 +87,10 @@ def get_defaults():
"traces_sample_rate": None,
},
}


app_config = Config(schema=get_defaults())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see multiple uses of aleph.config.app_config. Why not using aleph.config.get_config() everywhere and marking this variable as private with a _ prefix ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this variable useful versus using @lru_cache on def get_config() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It conveys the intent better IMO. Using LRU cache when there's only one return value seems a bit counterintuitive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the first comment ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the usage in job_utils.py is legit as we need to initialize the global variable. The other one in get_ipfs_api looks like an oversight from my part.



def get_config() -> Config:
return app_config
21 changes: 10 additions & 11 deletions src/aleph/handlers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,29 @@
TODO:
- check balances and storage allowance
- handle incentives from 3rd party
- hjandle garbage collection of unused hashes
- handle garbage collection of unused hashes
"""

import logging

import aioipfs
import asyncio
import logging
from typing import Dict
from aleph_message.models import StoreMessage
from pydantic import ValidationError

import aioipfs
from aioipfs import InvalidCIDError

from aleph.config import get_config
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.services.ipfs.common import get_ipfs_api
from aleph.storage import get_hash_content
from aleph.types import ItemType
from aleph.web import app
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph_message.models import StoreMessage
from pydantic import ValidationError

LOGGER = logging.getLogger("HANDLERS.STORAGE")


async def handle_new_storage(message: Dict, content: Dict):
if not app["config"].storage.store_files.value:
config = get_config()
if not config.storage.store_files.value:
return True # Ignore

# TODO: ideally the content should be transformed earlier, but this requires more clean up
Expand All @@ -49,7 +48,7 @@ async def handle_new_storage(message: Dict, content: Dict):
is_folder = False
item_hash = store_message.content.item_hash

ipfs_enabled = app["config"].ipfs.enabled.value
ipfs_enabled = config.ipfs.enabled.value
do_standard_lookup = True
size = 0

Expand Down
32 changes: 19 additions & 13 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
import asyncio
from typing import Dict
from typing import Tuple

import aleph.config
from aleph.model import init_db_globals
from aleph.services.ipfs.common import init_ipfs_globals
from aleph.services.p2p import init_p2p_client
from configmanager import Config

def prepare_loop(config_values: Dict) -> asyncio.AbstractEventLoop:
from aleph.model import init_db
from aleph.web import app
from configmanager import Config
from aleph.config import get_defaults
from aleph.services.ipfs.common import get_ipfs_api
from aleph.services.p2p import http, init_p2p_client

http.SESSION = None # type:ignore
def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
"""
Prepares all the global variables (sigh) needed to run an Aleph subprocess.

:param config_values: Dictionary of config values, as provided by the main process.
:returns: A preconfigured event loop, and the application config for convenience.
Use the event loop as event loop of the process as it is used by Motor. Using another
event loop will cause DB calls to fail.
"""

loop = asyncio.get_event_loop()

config = Config(schema=get_defaults())
app["config"] = config
config = aleph.config.app_config
config.load_values(config_values)

init_db(config, ensure_indexes=False)
loop.run_until_complete(get_ipfs_api(timeout=2, reset=True))
init_db_globals(config)
init_ipfs_globals(config)
_ = init_p2p_client(config)
return loop
return loop, config
11 changes: 6 additions & 5 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,17 @@ def pending_messages_subprocess(
"""

setproctitle("aleph.jobs.messages_task_loop")
loop, config = prepare_loop(config_values)

sentry_sdk.init(
dsn=config_values["sentry"]["dsn"],
traces_sample_rate=config_values["sentry"]["traces_sample_rate"],
dsn=config.sentry.dsn.value,
traces_sample_rate=config.sentry.traces_sample_rate.value,
ignore_errors=[KeyboardInterrupt],
)
setup_logging(
loglevel=config_values["logging"]["level"],
loglevel=config.logging.level.value,
filename="/tmp/messages_task_loop.log",
)
singleton.api_servers = api_servers

loop = prepare_loop(config_values)
loop.run_until_complete(asyncio.gather(retry_messages_task(shared_stats)))
loop.run_until_complete(retry_messages_task(shared_stats))
9 changes: 5 additions & 4 deletions src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,17 @@ async def handle_txs_task():

def pending_txs_subprocess(config_values: Dict, api_servers: List):
setproctitle("aleph.jobs.txs_task_loop")
loop, config = prepare_loop(config_values)

sentry_sdk.init(
dsn=config_values["sentry"]["dsn"],
traces_sample_rate=config_values["sentry"]["traces_sample_rate"],
dsn=config.sentry.dsn.value,
traces_sample_rate=config.sentry.traces_sample_rate.value,
ignore_errors=[KeyboardInterrupt],
)
setup_logging(
loglevel=config_values["logging"]["level"],
loglevel=config.logging.level.value,
filename="/tmp/txs_task_loop.log",
)
singleton.api_servers = api_servers

loop = prepare_loop(config_values)
loop.run_until_complete(handle_txs_task())
Loading