diff --git a/.env-wb-garbage-collector b/.env-wb-garbage-collector index b4cbe1c49638..db74b108fb23 100644 --- a/.env-wb-garbage-collector +++ b/.env-wb-garbage-collector @@ -6,7 +6,7 @@ WEBSERVER_ACTIVITY=null WEBSERVER_CATALOG=null -WEBSERVER_COMPUTATION=0 +WEBSERVER_NOTIFICATIONS=0 WEBSERVER_DIAGNOSTICS=null #WEBSERVER_DIRECTOR_V2 from .env WEBSERVER_DIRECTOR=null diff --git a/services/web/server/src/simcore_service_webserver/application.py b/services/web/server/src/simcore_service_webserver/application.py index 54b611d4b780..e3107e77982d 100644 --- a/services/web/server/src/simcore_service_webserver/application.py +++ b/services/web/server/src/simcore_service_webserver/application.py @@ -13,7 +13,6 @@ from .application_settings import setup_settings from .catalog import setup_catalog from .clusters.plugin import setup_clusters -from .computation import setup_computation from .db import setup_db from .diagnostics import setup_diagnostics from .director.plugin import setup_director @@ -26,6 +25,7 @@ from .login.plugin import setup_login from .long_running_tasks import setup_long_running_tasks from .meta_modeling.plugin import setup_meta_modeling +from .notifications.plugin import setup_notifications from .products.plugin import setup_products from .projects.plugin import setup_projects from .publications import setup_publications @@ -79,7 +79,7 @@ def create_application() -> web.Application: # monitoring setup_diagnostics(app) setup_activity(app) - setup_computation(app) + setup_notifications(app) setup_socketio(app) # login diff --git a/services/web/server/src/simcore_service_webserver/application_settings.py b/services/web/server/src/simcore_service_webserver/application_settings.py index 00024f5e3edf..a08a8f00d1ae 100644 --- a/services/web/server/src/simcore_service_webserver/application_settings.py +++ b/services/web/server/src/simcore_service_webserver/application_settings.py @@ -203,7 +203,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): # These plugins only require (for the moment) an entry to toggle between enabled/disabled WEBSERVER_CLUSTERS: bool = False - WEBSERVER_COMPUTATION: bool = True + WEBSERVER_NOTIFICATIONS: bool = Field( + default=True, env=["WEBSERVER_NOTIFICATIONS", "WEBSERVER_COMPUTATION"] + ) WEBSERVER_GROUPS: bool = True WEBSERVER_META_MODELING: bool = True WEBSERVER_PRODUCTS: bool = True diff --git a/services/web/server/src/simcore_service_webserver/application_settings_utils.py b/services/web/server/src/simcore_service_webserver/application_settings_utils.py index 586430cb4304..79bfd34345c3 100644 --- a/services/web/server/src/simcore_service_webserver/application_settings_utils.py +++ b/services/web/server/src/simcore_service_webserver/application_settings_utils.py @@ -145,7 +145,7 @@ def convert_to_app_config(app_settings: ApplicationSettings) -> dict[str, Any]: ), }, "clusters": {"enabled": app_settings.WEBSERVER_CLUSTERS}, - "computation": {"enabled": app_settings.is_enabled("WEBSERVER_COMPUTATION")}, + "computation": {"enabled": app_settings.is_enabled("WEBSERVER_NOTIFICATIONS")}, "diagnostics": {"enabled": app_settings.is_enabled("WEBSERVER_DIAGNOSTICS")}, "director-v2": {"enabled": app_settings.is_enabled("WEBSERVER_DIRECTOR_V2")}, "exporter": {"enabled": app_settings.WEBSERVER_EXPORTER is not None}, @@ -206,7 +206,6 @@ def _set_if_disabled(field_name, section): if db := cfg.get("db"): if section := db.get("postgres"): - envs["POSTGRES_DB"] = section.get("database") envs["POSTGRES_HOST"] = section.get("host") envs["POSTGRES_MAXSIZE"] = section.get("maxsize") @@ -275,7 +274,7 @@ def _set_if_disabled(field_name, section): envs["PROMETHEUS_VTAG"] = section.get("prometheus_api_version") if section := cfg.get("computation"): - _set_if_disabled("WEBSERVER_COMPUTATION", section) + _set_if_disabled("WEBSERVER_NOTIFICATIONS", section) if section := cfg.get("diagnostics"): _set_if_disabled("WEBSERVER_DIAGNOSTICS", section) diff --git a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py deleted file mode 100644 index cae3f2ce6943..000000000000 --- a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py +++ /dev/null @@ -1,183 +0,0 @@ -"""this module creates a background task that monitors changes in the database. -First a procedure is registered in postgres that gets triggered whenever the outputs -of a record in comp_task table is changed. -""" -import asyncio -import json -import logging -from contextlib import suppress -from pprint import pformat - -from aiohttp import web -from aiopg.sa import Engine -from aiopg.sa.connection import SAConnection -from models_library.errors import ErrorDict -from models_library.projects import ProjectID -from models_library.projects_nodes_io import NodeIDStr -from models_library.projects_state import RunningState -from pydantic.types import PositiveInt -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY -from servicelib.logging_utils import log_decorator -from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects -from sqlalchemy.sql import select - -from .computation_utils import convert_state_from_db -from .projects import projects_api, projects_exceptions -from .projects.projects_nodes_utils import update_node_outputs - -log = logging.getLogger(__name__) - - -@log_decorator(logger=log) -async def _get_project_owner(conn: SAConnection, project_uuid: str) -> PositiveInt: - the_project_owner = await conn.scalar( - select([projects.c.prj_owner]).where(projects.c.uuid == project_uuid) - ) - if not the_project_owner: - raise projects_exceptions.ProjectOwnerNotFoundError(project_uuid) - return the_project_owner - - -async def _update_project_state( - app: web.Application, - user_id: PositiveInt, - project_uuid: str, - node_uuid: str, - new_state: RunningState, - node_errors: list[ErrorDict] | None, -) -> None: - project = await projects_api.update_project_node_state( - app, user_id, project_uuid, node_uuid, new_state - ) - - await projects_api.notify_project_node_update(app, project, node_uuid, node_errors) - await projects_api.notify_project_state_update(app, project) - - -async def listen(app: web.Application, db_engine: Engine): - listen_query = f"LISTEN {DB_CHANNEL_NAME};" - _LISTENING_TASK_BASE_SLEEPING_TIME_S = 1 - async with db_engine.acquire() as conn: - assert conn.connection # nosec - await conn.execute(listen_query) - - while True: - # NOTE: instead of using await get() we check first if the connection was closed - # since aiopg does not reset the await in such a case (if DB was restarted or so) - # see aiopg issue: https://github.com/aio-libs/aiopg/pull/559#issuecomment-826813082 - if conn.closed: - raise ConnectionError("connection with database is closed!") - if conn.connection.notifies.empty(): - await asyncio.sleep(_LISTENING_TASK_BASE_SLEEPING_TIME_S) - continue - notification = conn.connection.notifies.get_nowait() - log.debug( - "received update from database: %s", pformat(notification.payload) - ) - # get the data and the info on what changed - payload: dict = json.loads(notification.payload) - - # FIXME: all this should move to rabbitMQ instead of this - task_data = payload.get("data", {}) - task_changes = payload.get("changes", []) - - if not task_data: - log.error("task data invalid: %s", pformat(payload)) - continue - - if not task_changes: - log.error("no changes but still triggered: %s", pformat(payload)) - continue - - project_uuid = task_data.get("project_id", "undefined") - node_uuid = task_data.get("node_id", "undefined") - - # FIXME: we do not know who triggered these changes. we assume the user had the rights to do so - # therefore we'll use the prj_owner user id. This should be fixed when the new sidecar comes in - # and comp_tasks/comp_pipeline get deprecated. - try: - # find the user(s) linked to that project - the_project_owner = await _get_project_owner(conn, project_uuid) - - if any(f in task_changes for f in ["outputs", "run_hash"]): - new_outputs = task_data.get("outputs", {}) - new_run_hash = task_data.get("run_hash", None) - - await update_node_outputs( - app, - the_project_owner, - ProjectID(project_uuid), - NodeIDStr(node_uuid), - new_outputs, - new_run_hash, - node_errors=task_data.get("errors", None), - ui_changed_keys=None, - ) - - if "state" in task_changes: - new_state = convert_state_from_db(task_data["state"]).value - await _update_project_state( - app, - the_project_owner, - project_uuid, - node_uuid, - new_state, - node_errors=task_data.get("errors", None), - ) - - except projects_exceptions.ProjectNotFoundError as exc: - log.warning( - "Project %s was not found and cannot be updated. Maybe was it deleted?", - exc.project_uuid, - ) - continue - except projects_exceptions.ProjectOwnerNotFoundError as exc: - log.warning( - "Project owner of project %s could not be found, is the project valid?", - exc.project_uuid, - ) - continue - except projects_exceptions.NodeNotFoundError as exc: - log.warning( - "Node %s of project %s not found and cannot be updated. Maybe was it deleted?", - exc.node_uuid, - exc.project_uuid, - ) - continue - - -async def comp_tasks_listening_task(app: web.Application) -> None: - log.info("starting comp_task db listening task...") - while True: - try: - # create a special connection here - db_engine = app[APP_DB_ENGINE_KEY] - log.info("listening to comp_task events...") - await listen(app, db_engine) - except asyncio.CancelledError: - # we are closing the app.. - log.info("cancelled comp_tasks events") - raise - except Exception: # pylint: disable=broad-except - log.exception( - "caught unhandled comp_task db listening task exception, restarting...", - exc_info=True, - ) - # wait a bit and try restart the task - await asyncio.sleep(3) - - -async def create_comp_tasks_listening_task(app: web.Application): - task = asyncio.create_task( - comp_tasks_listening_task(app), name="computation db listener" - ) - log.debug("comp_tasks db listening task created %s", f"{task=}") - - yield - - log.debug("cancelling comp_tasks db listening %s task...", f"{task=}") - task.cancel() - log.debug("waiting for comp_tasks db listening %s to stop", f"{task=}") - with suppress(asyncio.CancelledError): - await task - log.debug("waiting for comp_tasks db listening %s to stop completed", f"{task=}") diff --git a/services/web/server/src/simcore_service_webserver/notifications/__init__.py b/services/web/server/src/simcore_service_webserver/notifications/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/web/server/src/simcore_service_webserver/notifications/_db_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/notifications/_db_comp_tasks_listening_task.py new file mode 100644 index 000000000000..772a22d61fbc --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/notifications/_db_comp_tasks_listening_task.py @@ -0,0 +1,190 @@ +"""this module creates a background task that monitors changes in the database. +First a procedure is registered in postgres that gets triggered whenever the outputs +of a record in comp_task table is changed. +""" +import asyncio +import json +import logging +from contextlib import suppress +from dataclasses import dataclass +from typing import AsyncIterator, NoReturn + +from aiohttp import web +from aiopg.sa import Engine +from aiopg.sa.connection import SAConnection +from models_library.errors import ErrorDict +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeIDStr +from models_library.projects_state import RunningState +from pydantic.types import PositiveInt +from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects +from sqlalchemy.sql import select + +from ..projects import projects_api, projects_exceptions +from ..projects.projects_nodes_utils import update_node_outputs +from ._utils import convert_state_from_db + +_logger = logging.getLogger(__name__) + + +async def _get_project_owner(conn: SAConnection, project_uuid: str) -> PositiveInt: + the_project_owner: PositiveInt | None = await conn.scalar( + select([projects.c.prj_owner]).where(projects.c.uuid == project_uuid) + ) + if not the_project_owner: + raise projects_exceptions.ProjectOwnerNotFoundError(project_uuid) + return the_project_owner + + +async def _update_project_state( + app: web.Application, + user_id: PositiveInt, + project_uuid: str, + node_uuid: str, + new_state: RunningState, + node_errors: list[ErrorDict] | None, +) -> None: + project = await projects_api.update_project_node_state( + app, user_id, project_uuid, node_uuid, new_state + ) + + await projects_api.notify_project_node_update(app, project, node_uuid, node_errors) + await projects_api.notify_project_state_update(app, project) + + +@dataclass(frozen=True) +class _CompTaskNotificationPayload: + action: str + data: dict + changes: dict + table: str + + +async def _handle_db_notification( + app: web.Application, payload: _CompTaskNotificationPayload, conn: SAConnection +) -> None: + task_data = payload.data + task_changes = payload.changes + + project_uuid = task_data.get("project_id", None) + node_uuid = task_data.get("node_id", None) + if any(x is None for x in [project_uuid, node_uuid]): + _logger.warning( + "comp_tasks row is corrupted. TIP: please check DB entry containing '%s'", + f"{task_data=}", + ) + return + + assert project_uuid # nosec + assert node_uuid # nosec + + try: + # NOTE: we need someone with the rights to modify that project. the owner is one. + # find the user(s) linked to that project + the_project_owner = await _get_project_owner(conn, project_uuid) + + if any(f in task_changes for f in ["outputs", "run_hash"]): + new_outputs = task_data.get("outputs", {}) + new_run_hash = task_data.get("run_hash", None) + + await update_node_outputs( + app, + the_project_owner, + ProjectID(project_uuid), + NodeIDStr(node_uuid), + new_outputs, + new_run_hash, + node_errors=task_data.get("errors", None), + ui_changed_keys=None, + ) + + if "state" in task_changes: + new_state = convert_state_from_db(task_data["state"]).value + await _update_project_state( + app, + the_project_owner, + project_uuid, + node_uuid, + new_state, + node_errors=task_data.get("errors", None), + ) + + except projects_exceptions.ProjectNotFoundError as exc: + _logger.warning( + "Project %s was not found and cannot be updated. Maybe was it deleted?", + exc.project_uuid, + ) + except projects_exceptions.ProjectOwnerNotFoundError as exc: + _logger.warning( + "Project owner of project %s could not be found, is the project valid?", + exc.project_uuid, + ) + except projects_exceptions.NodeNotFoundError as exc: + _logger.warning( + "Node %s of project %s not found and cannot be updated. Maybe was it deleted?", + exc.node_uuid, + exc.project_uuid, + ) + + +async def _listen(app: web.Application, db_engine: Engine) -> NoReturn: + listen_query = f"LISTEN {DB_CHANNEL_NAME};" + _LISTENING_TASK_BASE_SLEEPING_TIME_S = 1 + async with db_engine.acquire() as conn: + assert conn.connection # nosec + await conn.execute(listen_query) + + while True: + # NOTE: instead of using await get() we check first if the connection was closed + # since aiopg does not reset the await in such a case (if DB was restarted or so) + # see aiopg issue: https://github.com/aio-libs/aiopg/pull/559#issuecomment-826813082 + if conn.closed: + raise ConnectionError("connection with database is closed!") + if conn.connection.notifies.empty(): + await asyncio.sleep(_LISTENING_TASK_BASE_SLEEPING_TIME_S) + continue + notification = conn.connection.notifies.get_nowait() + # get the data and the info on what changed + payload = _CompTaskNotificationPayload(**json.loads(notification.payload)) + _logger.debug("received update from database: %s", f"{payload=}") + await _handle_db_notification(app, payload, conn) + + +async def _comp_tasks_listening_task(app: web.Application) -> None: + _logger.info("starting comp_task db listening task...") + while True: + try: + # create a special connection here + db_engine = app[APP_DB_ENGINE_KEY] + _logger.info("listening to comp_task events...") + await _listen(app, db_engine) + except asyncio.CancelledError: + # we are closing the app.. + _logger.info("cancelled comp_tasks events") + raise + except Exception: # pylint: disable=broad-except + _logger.exception( + "caught unhandled comp_task db listening task exception, restarting...", + exc_info=True, + ) + # wait a bit and try restart the task + await asyncio.sleep(3) + + +async def create_comp_tasks_listening_task(app: web.Application) -> AsyncIterator[None]: + task = asyncio.create_task( + _comp_tasks_listening_task(app), name="computation db listener" + ) + _logger.debug("comp_tasks db listening task created %s", f"{task=}") + + yield + + _logger.debug("cancelling comp_tasks db listening %s task...", f"{task=}") + task.cancel() + _logger.debug("waiting for comp_tasks db listening %s to stop", f"{task=}") + with suppress(asyncio.CancelledError): + await task + _logger.debug( + "waiting for comp_tasks db listening %s to stop completed", f"{task=}" + ) diff --git a/services/web/server/src/simcore_service_webserver/computation_subscribe.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py similarity index 76% rename from services/web/server/src/simcore_service_webserver/computation_subscribe.py rename to services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py index d63496424307..3587040a66d8 100644 --- a/services/web/server/src/simcore_service_webserver/computation_subscribe.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py @@ -1,6 +1,6 @@ import functools import logging -from typing import Union +from typing import Any, AsyncIterator, Callable, Coroutine, Final, Union from aiohttp import web from models_library.rabbitmq_messages import ( @@ -22,10 +22,10 @@ from servicelib.logging_utils import log_context from servicelib.rabbitmq import RabbitMQClient -from .projects import projects_api -from .projects.projects_exceptions import NodeNotFoundError, ProjectNotFoundError -from .rabbitmq import get_rabbitmq_client -from .socketio.events import ( +from ..projects import projects_api +from ..projects.projects_exceptions import NodeNotFoundError, ProjectNotFoundError +from ..rabbitmq import get_rabbitmq_client +from ..socketio.events import ( SOCKET_IO_EVENT, SOCKET_IO_LOG_EVENT, SOCKET_IO_NODE_PROGRESS_EVENT, @@ -35,7 +35,7 @@ send_messages, ) -log = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) async def _handle_computation_running_progress( @@ -65,13 +65,13 @@ async def _handle_computation_running_progress( await send_messages(app, f"{message.user_id}", messages) return True except ProjectNotFoundError: - log.warning( + _logger.warning( "project related to received rabbitMQ progress message not found: '%s'", json_dumps(message, indent=2), ) return True except NodeNotFoundError: - log.warning( + _logger.warning( "node related to received rabbitMQ progress message not found: '%s'", json_dumps(message, indent=2), ) @@ -79,19 +79,22 @@ async def _handle_computation_running_progress( return False -async def progress_message_parser(app: web.Application, data: bytes) -> bool: +async def _progress_message_parser(app: web.Application, data: bytes) -> bool: # update corresponding project, node, progress value - rabbit_message = parse_raw_as( + rabbit_message: ( + ProgressRabbitMessageNode | ProgressRabbitMessageProject + ) = parse_raw_as( Union[ProgressRabbitMessageNode, ProgressRabbitMessageProject], data ) if rabbit_message.progress_type is ProgressType.COMPUTATION_RUNNING: # NOTE: backward compatibility, this progress is kept in the project + assert isinstance(rabbit_message, ProgressRabbitMessageNode) # nosec return await _handle_computation_running_progress(app, rabbit_message) # NOTE: other types of progress are transient is_type_message_node = type(rabbit_message) == ProgressRabbitMessageNode - message = { + socket_message: SocketMessageDict = { "event_type": ( SOCKET_IO_NODE_PROGRESS_EVENT if is_type_message_node @@ -105,12 +108,12 @@ async def progress_message_parser(app: web.Application, data: bytes) -> bool: }, } if is_type_message_node: - message["data"]["node_id"] = rabbit_message.node_id - await send_messages(app, f"{rabbit_message.user_id}", [message]) + socket_message["data"]["node_id"] = rabbit_message.node_id + await send_messages(app, f"{rabbit_message.user_id}", [socket_message]) return True -async def log_message_parser(app: web.Application, data: bytes) -> bool: +async def _log_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = LoggerRabbitMessage.parse_raw(data) if not await projects_api.is_project_hidden(app, rabbit_message.project_id): @@ -124,7 +127,7 @@ async def log_message_parser(app: web.Application, data: bytes) -> bool: return True -async def instrumentation_message_parser(app: web.Application, data: bytes) -> bool: +async def _instrumentation_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = InstrumentationRabbitMessage.parse_raw(data) if rabbit_message.metrics == "service_started": service_started( @@ -137,7 +140,7 @@ async def instrumentation_message_parser(app: web.Application, data: bytes) -> b return True -async def events_message_parser(app: web.Application, data: bytes) -> bool: +async def _events_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = EventRabbitMessage.parse_raw(data) socket_messages: list[SocketMessageDict] = [ @@ -153,35 +156,47 @@ async def events_message_parser(app: web.Application, data: bytes) -> bool: return True -EXCHANGE_TO_PARSER_CONFIG = ( +EXCHANGE_TO_PARSER_CONFIG: Final[ + tuple[ + tuple[ + str, + Callable[[web.Application, bytes], Coroutine[Any, Any, bool]], + dict[str, Any], + ], + ..., + ] +] = ( ( LoggerRabbitMessage.get_channel_name(), - log_message_parser, + _log_message_parser, {}, ), ( ProgressRabbitMessageNode.get_channel_name(), - progress_message_parser, + _progress_message_parser, {}, ), ( InstrumentationRabbitMessage.get_channel_name(), - instrumentation_message_parser, + _instrumentation_message_parser, dict(exclusive_queue=False), ), ( EventRabbitMessage.get_channel_name(), - events_message_parser, + _events_message_parser, {}, ), ) -async def setup_rabbitmq_consumers(app: web.Application) -> None: - with log_context(log, logging.INFO, msg="Subscribing to rabbitmq channels"): +async def setup_rabbitmq_consumers(app: web.Application) -> AsyncIterator[None]: + with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channels"): rabbit_client: RabbitMQClient = get_rabbitmq_client(app) for exchange_name, parser_fct, queue_kwargs in EXCHANGE_TO_PARSER_CONFIG: await rabbit_client.subscribe( exchange_name, functools.partial(parser_fct, app), **queue_kwargs ) + yield + + # cleanup? diff --git a/services/web/server/src/simcore_service_webserver/computation_utils.py b/services/web/server/src/simcore_service_webserver/notifications/_utils.py similarity index 100% rename from services/web/server/src/simcore_service_webserver/computation_utils.py rename to services/web/server/src/simcore_service_webserver/notifications/_utils.py diff --git a/services/web/server/src/simcore_service_webserver/computation.py b/services/web/server/src/simcore_service_webserver/notifications/plugin.py similarity index 62% rename from services/web/server/src/simcore_service_webserver/computation.py rename to services/web/server/src/simcore_service_webserver/notifications/plugin.py index 8a8e383d934e..967f84b96c83 100644 --- a/services/web/server/src/simcore_service_webserver/computation.py +++ b/services/web/server/src/simcore_service_webserver/notifications/plugin.py @@ -7,9 +7,11 @@ from aiohttp import web from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup -from .computation_comp_tasks_listening_task import create_comp_tasks_listening_task -from .computation_subscribe import setup_rabbitmq_consumers -from .rabbitmq import setup_rabbitmq +from ..db import setup_db +from ..rabbitmq import setup_rabbitmq +from ..socketio.plugin import setup_socketio +from ._db_comp_tasks_listening_task import create_comp_tasks_listening_task +from ._rabbitmq_consumers import setup_rabbitmq_consumers log = logging.getLogger(__name__) @@ -17,17 +19,19 @@ @app_module_setup( __name__, ModuleCategory.ADDON, - settings_name="WEBSERVER_COMPUTATION", + settings_name="WEBSERVER_NOTIFICATIONS", logger=log, depends=[ "simcore_service_webserver.diagnostics", ], # depends on diagnostics for setting the instrumentation ) -def setup_computation(app: web.Application): +def setup_notifications(app: web.Application): setup_rabbitmq(app) + setup_socketio(app) # Subscribe to rabbit upon startup for logs, progress and other # metrics on the execution reported by sidecars - app.on_startup.append(setup_rabbitmq_consumers) + app.cleanup_ctx.append(setup_rabbitmq_consumers) # Creates a task to listen to comp_task pg-db's table events + setup_db(app) app.cleanup_ctx.append(create_comp_tasks_listening_task) diff --git a/services/web/server/tests/integration/02/test_computation.py b/services/web/server/tests/integration/02/test_computation.py index d2a22b384d31..f9408a0ce673 100644 --- a/services/web/server/tests/integration/02/test_computation.py +++ b/services/web/server/tests/integration/02/test_computation.py @@ -31,12 +31,12 @@ ) from simcore_service_webserver._meta import API_VTAG from simcore_service_webserver.application_settings import setup_settings -from simcore_service_webserver.computation import setup_computation -from simcore_service_webserver.computation_utils import DB_TO_RUNNING_STATE from simcore_service_webserver.db import setup_db from simcore_service_webserver.diagnostics import setup_diagnostics from simcore_service_webserver.director_v2 import setup_director_v2 from simcore_service_webserver.login.plugin import setup_login +from simcore_service_webserver.notifications._utils import DB_TO_RUNNING_STATE +from simcore_service_webserver.notifications.plugin import setup_notifications from simcore_service_webserver.products.plugin import setup_products from simcore_service_webserver.projects.plugin import setup_projects from simcore_service_webserver.resource_manager.plugin import setup_resource_manager @@ -89,6 +89,7 @@ class ExpectedResponse(NamedTuple): type[web.HTTPUnauthorized] | type[web.HTTPForbidden] | type[web.HTTPNoContent] ) forbidden: (type[web.HTTPUnauthorized] | type[web.HTTPForbidden]) + # pylint: disable=no-member def __str__(self) -> str: items = ", ".join(f"{k}={v.__name__}" for k, v in self._asdict().items()) @@ -151,7 +152,6 @@ def client( mocker: MockerFixture, monkeypatch_setenv_from_app_config: Callable, ) -> TestClient: - cfg = deepcopy(app_config) assert cfg["rest"]["version"] == API_VTAG @@ -173,7 +173,7 @@ def client( setup_users(app) setup_socketio(app) setup_projects(app) - setup_computation(app) + setup_notifications(app) setup_director_v2(app) setup_resource_manager(app) setup_products(app) @@ -286,6 +286,7 @@ async def _assert_and_wait_for_pipeline_state( expected_state: RunningState, expected_api_response: ExpectedResponse, ): + assert client.app url_project_state = client.app.router["get_project_state"].url_for( project_id=project_id ) @@ -317,7 +318,6 @@ async def _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects( project_id: str, postgres_session: sa.orm.session.Session, ): - async for attempt in AsyncRetrying( reraise=True, stop=stop_after_delay(120), @@ -372,6 +372,7 @@ async def test_start_stop_computation( user_role: UserRole, expected: ExpectedResponse, ): + assert client.app project_id = user_project["uuid"] fake_workbench_payload = user_project["workbench"] @@ -444,6 +445,7 @@ async def test_run_pipeline_and_check_state( user_role: UserRole, expected: ExpectedResponse, ): + assert client.app project_id = user_project["uuid"] fake_workbench_payload = user_project["workbench"] diff --git a/services/web/server/tests/integration/02/test_rabbit.py b/services/web/server/tests/integration/02/test_rabbit.py index c9043302a1a8..63667f3370fa 100644 --- a/services/web/server/tests/integration/02/test_rabbit.py +++ b/services/web/server/tests/integration/02/test_rabbit.py @@ -34,11 +34,11 @@ from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.comp_tasks import NodeClass from simcore_service_webserver.application_settings import setup_settings -from simcore_service_webserver.computation import setup_computation from simcore_service_webserver.db import setup_db from simcore_service_webserver.diagnostics import setup_diagnostics from simcore_service_webserver.director_v2 import setup_director_v2 from simcore_service_webserver.login.plugin import setup_login +from simcore_service_webserver.notifications.plugin import setup_notifications from simcore_service_webserver.projects.plugin import setup_projects from simcore_service_webserver.resource_manager.plugin import setup_resource_manager from simcore_service_webserver.rest import setup_rest @@ -97,7 +97,6 @@ async def _publish_in_rabbit( num_messages: int, rabbit_exchanges: RabbitExchanges, ) -> tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]: - log_messages = [ LoggerRabbitMessage( user_id=user_id, @@ -207,7 +206,7 @@ def client( setup_diagnostics(app) setup_login(app) setup_projects(app) - setup_computation(app) + setup_notifications(app) setup_director_v2(app) setup_socketio(app) setup_resource_manager(app) @@ -261,7 +260,6 @@ async def socketio_subscriber_handlers( client_session_id: UUIDStr, mocker: MockerFixture, ) -> AsyncIterator[SocketIoHandlers]: - """socketio SUBSCRIBER Somehow this emulates the logic of the front-end: @@ -323,7 +321,6 @@ async def rabbit_exchanges( rabbit_settings: RabbitSettings, rabbit_channel: aio_pika.Channel, ) -> AsyncIterator[RabbitExchanges]: - logs_exchange = await rabbit_channel.declare_exchange( LoggerRabbitMessage.get_channel_name(), aio_pika.ExchangeType.FANOUT, @@ -403,7 +400,6 @@ async def test_publish_to_other_user( Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ], ): - # Some other client publishes messages with wrong user id await publish_some_messages_in_rabbit( not_logged_user_id, diff --git a/services/web/server/tests/unit/isolated/test_computation_subscribe.py b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py similarity index 78% rename from services/web/server/tests/unit/isolated/test_computation_subscribe.py rename to services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py index 21f853da5394..11c9aaa81962 100644 --- a/services/web/server/tests/unit/isolated/test_computation_subscribe.py +++ b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py @@ -1,5 +1,7 @@ # pylint: disable=redefined-outer-name +# pylint: disable=protected-access +from typing import Iterator from unittest.mock import AsyncMock import pytest @@ -9,21 +11,22 @@ ProgressRabbitMessageProject, ProgressType, ) +from pydantic import BaseModel from pytest_mock import MockerFixture -from simcore_service_webserver import computation_subscribe +from simcore_service_webserver.notifications import _rabbitmq_consumers _faker = Faker() @pytest.fixture -def mock_send_messages(mocker: MockerFixture) -> dict: +def mock_send_messages(mocker: MockerFixture) -> Iterator[dict]: reference = {} async def mock_send_message(*args) -> None: reference["args"] = args mocker.patch.object( - computation_subscribe, "send_messages", side_effect=mock_send_message + _rabbitmq_consumers, "send_messages", side_effect=mock_send_message ) yield reference @@ -60,9 +63,9 @@ async def mock_send_message(*args) -> None: ], ) async def test_regression_progress_message_parser( - mock_send_messages: dict, raw_data: bytes, class_type: type + mock_send_messages: dict, raw_data: bytes, class_type: type[BaseModel] ): - await computation_subscribe.progress_message_parser(AsyncMock(), raw_data) + await _rabbitmq_consumers._progress_message_parser(AsyncMock(), raw_data) serialized_sent_data = mock_send_messages["args"][2][0]["data"] # check that all fields are sent as expected assert class_type.parse_obj(serialized_sent_data) diff --git a/services/web/server/tests/unit/with_dbs/01/test_comp_tasks_listening_task.py b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py similarity index 90% rename from services/web/server/tests/unit/with_dbs/01/test_comp_tasks_listening_task.py rename to services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py index 97bc6332c8d6..7d59f1bfb833 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_comp_tasks_listening_task.py +++ b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py @@ -21,7 +21,7 @@ from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks from simcore_postgres_database.models.users import UserRole -from simcore_service_webserver.computation_comp_tasks_listening_task import ( +from simcore_service_webserver.notifications._db_comp_tasks_listening_task import ( create_comp_tasks_listening_task, ) from tenacity._asyncio import AsyncRetrying @@ -40,16 +40,16 @@ async def mock_project_subsystem( mocked_project_calls = {} mocked_project_calls["update_node_outputs"] = mocker.patch( - "simcore_service_webserver.computation_comp_tasks_listening_task.update_node_outputs", + "simcore_service_webserver.notifications._db_comp_tasks_listening_task.update_node_outputs", return_value="", ) mocked_project_calls["_get_project_owner"] = mocker.patch( - "simcore_service_webserver.computation_comp_tasks_listening_task._get_project_owner", + "simcore_service_webserver.notifications._db_comp_tasks_listening_task._get_project_owner", return_value="", ) mocked_project_calls["_update_project_state"] = mocker.patch( - "simcore_service_webserver.computation_comp_tasks_listening_task._update_project_state", + "simcore_service_webserver.notifications._db_comp_tasks_listening_task._update_project_state", return_value="", ) @@ -143,7 +143,10 @@ async def test_listen_comp_tasks_task( some_project = project(logged_user) pipeline(project_id=f"{some_project.uuid}") task = comp_task( - project_id=f"{some_project.uuid}", outputs=json.dumps({}), node_class=task_class + project_id=f"{some_project.uuid}", + node_id=faker.uuid4(), + outputs=json.dumps({}), + node_class=task_class, ) async with db_engine.acquire() as conn: # let's update some values @@ -153,7 +156,7 @@ async def test_listen_comp_tasks_task( .where(comp_tasks.c.task_id == task["task_id"]) ) - # tests whether listener gets hooked calls executed + # tests whether listener gets executed for call_name, mocked_call in mock_project_subsystem.items(): if call_name in expected_calls: async for attempt in AsyncRetrying( diff --git a/services/web/server/tests/unit/with_dbs/01/test_computation_utils.py b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__utils.py similarity index 79% rename from services/web/server/tests/unit/with_dbs/01/test_computation_utils.py rename to services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__utils.py index defa60822699..1a99ecf6b706 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_computation_utils.py +++ b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__utils.py @@ -6,7 +6,7 @@ import pytest from models_library.projects_state import RunningState from simcore_postgres_database.models.comp_pipeline import StateType -from simcore_service_webserver.computation_utils import convert_state_from_db +from simcore_service_webserver.notifications._utils import convert_state_from_db @pytest.mark.parametrize( @@ -19,5 +19,5 @@ (StateType.NOT_STARTED, RunningState.NOT_STARTED), ], ) -def test_convert_state_from_db(db_state: int, expected_state: RunningState): +def test_convert_state_from_db(db_state: StateType, expected_state: RunningState): assert convert_state_from_db(db_state) == expected_state diff --git a/services/web/server/tests/unit/with_dbs/01/studies_dispatcher/conftest.py b/services/web/server/tests/unit/with_dbs/01/studies_dispatcher/conftest.py index c4fa799f6b58..2123b4e1d766 100644 --- a/services/web/server/tests/unit/with_dbs/01/studies_dispatcher/conftest.py +++ b/services/web/server/tests/unit/with_dbs/01/studies_dispatcher/conftest.py @@ -23,7 +23,7 @@ def app_environment(app_environment: EnvVarsDict, monkeypatch: MonkeyPatch): { "WEBSERVER_ACTIVITY": "null", "WEBSERVER_CATALOG": "null", - "WEBSERVER_COMPUTATION": "0", + "WEBSERVER_NOTIFICATIONS": "0", "WEBSERVER_DIAGNOSTICS": "null", "WEBSERVER_DIRECTOR": "null", "WEBSERVER_EXPORTER": "null", diff --git a/services/web/server/tests/unit/with_dbs/03/invitations/test_invitations.py b/services/web/server/tests/unit/with_dbs/03/invitations/test_invitations.py index 0d970cdc9438..004de381ff23 100644 --- a/services/web/server/tests/unit/with_dbs/03/invitations/test_invitations.py +++ b/services/web/server/tests/unit/with_dbs/03/invitations/test_invitations.py @@ -32,7 +32,7 @@ def app_environment( monkeypatch, { "WEBSERVER_ACTIVITY": "null", - "WEBSERVER_COMPUTATION": "0", + "WEBSERVER_NOTIFICATIONS": "0", "WEBSERVER_DIAGNOSTICS": "null", "WEBSERVER_DIRECTOR": "null", "WEBSERVER_EXPORTER": "null", diff --git a/services/web/server/tests/unit/with_dbs/03/login/conftest.py b/services/web/server/tests/unit/with_dbs/03/login/conftest.py index 3aa23d0e08cd..92172ce5c982 100644 --- a/services/web/server/tests/unit/with_dbs/03/login/conftest.py +++ b/services/web/server/tests/unit/with_dbs/03/login/conftest.py @@ -22,7 +22,7 @@ def app_environment(app_environment: EnvVarsDict, monkeypatch: MonkeyPatch): monkeypatch, { "WEBSERVER_ACTIVITY": "null", - "WEBSERVER_COMPUTATION": "0", + "WEBSERVER_NOTIFICATIONS": "0", "WEBSERVER_DIAGNOSTICS": "null", "WEBSERVER_DIRECTOR": "null", "WEBSERVER_EXPORTER": "null", diff --git a/services/web/server/tests/unit/with_dbs/03/test_email.py b/services/web/server/tests/unit/with_dbs/03/test_email.py index fbda9e9d32c9..05090ac77f94 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_email.py +++ b/services/web/server/tests/unit/with_dbs/03/test_email.py @@ -34,7 +34,7 @@ def app_environment(app_environment: EnvVarsDict, monkeypatch: MonkeyPatch): monkeypatch, { "WEBSERVER_ACTIVITY": "null", - "WEBSERVER_COMPUTATION": "0", + "WEBSERVER_NOTIFICATIONS": "0", "WEBSERVER_DIAGNOSTICS": "null", "WEBSERVER_DIRECTOR": "null", "WEBSERVER_EXPORTER": "null",