diff --git a/api/specs/webserver/openapi-services.yaml b/api/specs/webserver/openapi-services.yaml index 33d2def1e06e..a00e9ee3e764 100644 --- a/api/specs/webserver/openapi-services.yaml +++ b/api/specs/webserver/openapi-services.yaml @@ -7,9 +7,24 @@ paths: operationId: start_pipeline parameters: - $ref: "#/components/parameters/ProjectId" + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + subgraph: + description: The node uuids selected for running a partial pipeline + type: array + uniqueItems: true + items: + type: string + format: uuid + responses: "201": - description: Succesffully started the pipeline + description: Successfully started the pipeline content: application/json: schema: diff --git a/packages/pytest-simcore/src/pytest_simcore/services_api_mocks_for_aiohttp_clients.py b/packages/pytest-simcore/src/pytest_simcore/services_api_mocks_for_aiohttp_clients.py index 1576528780c0..b93208ad4b6c 100644 --- a/packages/pytest-simcore/src/pytest_simcore/services_api_mocks_for_aiohttp_clients.py +++ b/packages/pytest-simcore/src/pytest_simcore/services_api_mocks_for_aiohttp_clients.py @@ -1,4 +1,5 @@ import re +from typing import Dict, List import pytest from aioresponses import aioresponses @@ -6,6 +7,15 @@ from models_library.projects_state import RunningState from yarl import URL +# The adjacency list is defined as a dictionary with the key to the node and its list of successors +FULL_PROJECT_PIPELINE_ADJACENCY: Dict[str, List[str]] = { + "node id 1": ["node id 2", "node id 3", "node id 4"], + "node id 2": ["node_id 5"], + "node id 3": ["node_id 5"], + "node id 4": ["node_id 5"], + "node id 5": [], +} + def creation_cb(url, **kwargs) -> CallbackResult: @@ -18,9 +28,19 @@ def creation_cb(url, **kwargs) -> CallbackResult: if "start_pipeline" in body and body["start_pipeline"] else RunningState.NOT_STARTED ) + pipeline: Dict[str, List[str]] = FULL_PROJECT_PIPELINE_ADJACENCY + if body.get("subgraph"): + # create some fake adjacency list + for node_id in body.get("subgraph"): + pipeline[node_id] = ["some node 5334", "some node 2324"] return CallbackResult( - status=201, payload={"id": kwargs["json"]["project_id"], "state": state} + status=201, + payload={ + "id": kwargs["json"]["project_id"], + "state": state, + "pipeline": pipeline, + }, ) diff --git a/packages/service-library/src/servicelib/logging_utils.py b/packages/service-library/src/servicelib/logging_utils.py index 5d730975e206..f7375c7a8447 100644 --- a/packages/service-library/src/servicelib/logging_utils.py +++ b/packages/service-library/src/servicelib/logging_utils.py @@ -93,7 +93,7 @@ def set_logging_handler( def _log_arguments( - logger_obj: logging.Logger, func: Callable, *args, **kwargs + logger_obj: logging.Logger, level: int, func: Callable, *args, **kwargs ) -> Dict[str, str]: args_passed_in_function = [repr(a) for a in args] kwargs_passed_in_function = [f"{k}={v!r}" for k, v in kwargs.items()] @@ -112,7 +112,8 @@ def _log_arguments( } # Before to the function execution, log function details. - logger_obj.info( + logger_obj.log( + level, "Arguments: %s - Begin function", formatted_arguments, extra=extra_args, @@ -121,7 +122,7 @@ def _log_arguments( return extra_args -def log_decorator(logger=None): +def log_decorator(logger=None, level: int = logging.DEBUG): # Build logger object logger_obj = logger or log @@ -130,12 +131,12 @@ def log_decorator_info(func): @functools.wraps(func) async def log_decorator_wrapper(*args, **kwargs): - extra_args = _log_arguments(logger_obj, func, *args, **kwargs) + extra_args = _log_arguments(logger_obj, level, func, *args, **kwargs) try: # log return value from the function value = await func(*args, **kwargs) - logger_obj.debug( - "Returned: - End function %r", value, extra=extra_args + logger_obj.log( + level, "Returned: - End function %r", value, extra=extra_args ) except: # log exception if occurs in function @@ -154,8 +155,8 @@ def log_decorator_wrapper(*args, **kwargs): try: # log return value from the function value = func(*args, **kwargs) - logger_obj.debug( - "Returned: - End function %r", value, extra=extra_args + logger_obj.log( + level, "Returned: - End function %r", value, extra=extra_args ) except: # log exception if occurs in function diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports/filemanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports/filemanager.py index 40debd6ac17b..ee3fb14b0f4b 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports/filemanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports/filemanager.py @@ -1,3 +1,5 @@ +import json + # pylint: disable=too-many-arguments import logging import warnings @@ -7,12 +9,11 @@ import aiofiles from aiohttp import ClientPayloadError, ClientSession, ClientTimeout -from yarl import URL - from models_library.settings.services_common import ServicesCommonSettings from simcore_service_storage_sdk import ApiClient, Configuration, UsersApi from simcore_service_storage_sdk.rest import ApiException from tqdm import tqdm +from yarl import URL from ..config.http_clients import client_request_settings from . import config, exceptions @@ -177,7 +178,7 @@ async def _upload_file_to_link( ) pbar.update(file_size) # get the S3 etag from the headers - e_tag = resp.headers.get("Etag", None) + e_tag = json.loads(resp.headers.get("Etag", None)) log.debug("Uploaded %s to %s, received Etag %s", file_path, url, e_tag) return e_tag diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index be71afaa295b..50e13d3f6516 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -102,7 +102,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[^\\s/]+)+$", + "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[\\w/-]+)+$", "type": "string", "description": "Distinctive name for the node based on the docker registry path" }, @@ -160,7 +160,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[^\\s/]+)+$", + "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[\\w/-]+)+$", "type": "string", "description": "Distinctive name for the node based on the docker registry path" }, @@ -242,7 +242,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/RunningServicesEnveloped" + "$ref": "#/components/schemas/RunningServicesDetailsArrayEnveloped" } } } @@ -294,7 +294,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[^\\s/]+)+$", + "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[\\w/-]+)+$", "type": "string", "description": "distinctive name for the node based on the docker registry path", "example": [ @@ -391,7 +391,7 @@ "tags": [ "computations" ], - "summary": "Create and optionaly Start a new computation", + "summary": "Create and optionally start a new computation", "operationId": "create_computation_v2_computations_post", "requestBody": { "content": { @@ -574,6 +574,59 @@ } } } + }, + "/v2/dynamic_services/{node_uuid}:retrieve": { + "post": { + "tags": [ + "dynamic services" + ], + "summary": "Calls the dynamic service's retrieve endpoint with optional port_keys", + "operationId": "service_retrieve_data_on_ports_v2_dynamic_services__node_uuid__retrieve_post", + "parameters": [ + { + "required": true, + "schema": { + "title": "Node Uuid", + "type": "string", + "format": "uuid" + }, + "name": "node_uuid", + "in": "path" + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RetrieveDataIn" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RetrieveDataOutEnveloped" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } } }, "components": { @@ -596,20 +649,12 @@ "title": "Email", "type": "string", "description": "Email address", - "format": "email", - "example": [ - "sun@sense.eight", - "deleen@minbar.bab" - ] + "format": "email" }, "affiliation": { "title": "Affiliation", "type": "string", - "description": "Affiliation of the author", - "example": [ - "Sense8", - "Babylon 5" - ] + "description": "Affiliation of the author" } }, "additionalProperties": false @@ -626,12 +671,7 @@ "name": { "title": "Name", "type": "string", - "description": "Name of the subject", - "example": [ - "travis-ci", - "coverals.io", - "github.io" - ] + "description": "Name of the subject" }, "image": { "title": "Image", @@ -639,12 +679,7 @@ "minLength": 1, "type": "string", "description": "Url to the badge", - "format": "uri", - "example": [ - "https://travis-ci.org/ITISFoundation/osparc-simcore.svg?branch=master", - "https://coveralls.io/repos/github/ITISFoundation/osparc-simcore/badge.svg?branch=master", - "https://img.shields.io/website-up-down-green-red/https/itisfoundation.github.io.svg?label=documentation" - ] + "format": "uri" }, "url": { "title": "Url", @@ -652,12 +687,7 @@ "minLength": 1, "type": "string", "description": "Link to the status", - "format": "uri", - "example": [ - "https://travis-ci.org/ITISFoundation/osparc-simcore 'State of CI: build, test and pushing images'", - "https://coveralls.io/github/ITISFoundation/osparc-simcore?branch=master 'Test coverage'", - "https://itisfoundation.github.io/" - ] + "format": "uri" } }, "additionalProperties": false @@ -685,6 +715,16 @@ "type": "boolean", "description": "if True the computation pipeline will start right away", "default": false + }, + "subgraph": { + "title": "Subgraph", + "uniqueItems": true, + "type": "array", + "items": { + "type": "string", + "format": "uuid" + }, + "description": "An optional set of nodes that must be executed, if empty the whole pipeline is executed" } } }, @@ -713,6 +753,7 @@ "required": [ "id", "state", + "pipeline", "url" ], "type": "object", @@ -736,6 +777,18 @@ "type": "string", "description": "the result of the computational task" }, + "pipeline": { + "title": "Pipeline", + "type": "object", + "additionalProperties": { + "type": "array", + "items": { + "type": "string", + "format": "uuid" + } + }, + "description": "the corresponding pipeline in terms of node uuids" + }, "url": { "title": "Url", "maxLength": 65536, @@ -772,8 +825,8 @@ "title": "HTTPValidationError", "type": "object", "properties": { - "detail": { - "title": "Detail", + "errors": { + "title": "Validation errors", "type": "array", "items": { "$ref": "#/components/schemas/ValidationError" @@ -827,39 +880,87 @@ "type": "string", "description": "An enumeration." }, - "RunningServiceType": { - "title": "RunningServiceType", + "RetrieveDataIn": { + "title": "RetrieveDataIn", + "required": [ + "port_keys" + ], + "type": "object", + "properties": { + "port_keys": { + "title": "Port Keys", + "type": "array", + "items": { + "type": "string", + "pattern": "^[-_a-zA-Z0-9]+$" + }, + "description": "The port keys to retrieve data from" + } + } + }, + "RetrieveDataOut": { + "title": "RetrieveDataOut", + "required": [ + "size_bytes" + ], + "type": "object", + "properties": { + "size_bytes": { + "title": "Size Bytes", + "type": "integer", + "description": "The amount of data transferred by the retrieve call" + } + } + }, + "RetrieveDataOutEnveloped": { + "title": "RetrieveDataOutEnveloped", + "required": [ + "data" + ], + "type": "object", + "properties": { + "data": { + "$ref": "#/components/schemas/RetrieveDataOut" + } + } + }, + "RunningServiceDetails": { + "title": "RunningServiceDetails", "required": [ "published_port", + "entry_point", "service_uuid", "service_key", "service_version", "service_host", - "service_port", - "service_state" + "service_basepath", + "service_state", + "service_message" ], "type": "object", "properties": { "published_port": { "title": "Published Port", + "exclusiveMaximum": 65535.0, "exclusiveMinimum": 0.0, "type": "integer", - "description": "The ports where the service provides its interface" + "description": "The ports where the service provides its interface on the docker swarm", + "deprecated": true }, "entry_point": { "title": "Entry Point", "type": "string", - "description": "The entry point where the service provides its interface if specified" + "description": "The entry point where the service provides its interface" }, "service_uuid": { "title": "Service Uuid", + "pattern": "^[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}$", "type": "string", - "description": "The UUID attached to this service", - "format": "uuid4" + "description": "The node UUID attached to the service" }, "service_key": { "title": "Service Key", - "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[^\\s/]+)+$", + "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[\\w/-]+)+$", "type": "string", "description": "distinctive name for the node based on the docker registry path", "example": [ @@ -884,15 +985,16 @@ }, "service_port": { "title": "Service Port", + "exclusiveMaximum": 65535.0, "exclusiveMinimum": 0.0, "type": "integer", - "description": "port to access the service within the network" + "description": "port to access the service within the network", + "default": 80 }, "service_basepath": { "title": "Service Basepath", "type": "string", - "description": "different base path where current service is mounted otherwise defaults to root", - "default": "" + "description": "the service base entrypoint where the service serves its contents" }, "service_state": { "allOf": [ @@ -909,22 +1011,22 @@ } } }, - "RunningServicesArray": { - "title": "RunningServicesArray", + "RunningServicesDetailsArray": { + "title": "RunningServicesDetailsArray", "type": "array", "items": { - "$ref": "#/components/schemas/RunningServiceType" + "$ref": "#/components/schemas/RunningServiceDetails" } }, - "RunningServicesEnveloped": { - "title": "RunningServicesEnveloped", + "RunningServicesDetailsArrayEnveloped": { + "title": "RunningServicesDetailsArrayEnveloped", "required": [ "data" ], "type": "object", "properties": { "data": { - "$ref": "#/components/schemas/RunningServicesArray" + "$ref": "#/components/schemas/RunningServicesDetailsArray" } } }, @@ -1012,44 +1114,30 @@ "minLength": 1, "type": "string", "description": "url to the thumbnail", - "format": "uri", - "example": "https://user-images.githubusercontent.com/32800795/61083844-ff48fb00-a42c-11e9-8e63-fa2d709c8baf.png" + "format": "uri" }, "description": { "title": "Description", "type": "string", - "description": "human readable description of the purpose of the node", - "example": [ - "Our best node type", - "The mother of all nodes, makes your numbers shine!" - ] + "description": "human readable description of the purpose of the node" }, "key": { "title": "Key", - "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[^\\s/]+)+$", + "pattern": "^(simcore)/(services)/(comp|dynamic|frontend)(/[\\w/-]+)+$", "type": "string", - "description": "distinctive name for the node based on the docker registry path", - "example": [ - "simcore/services/comp/itis/sleeper", - "simcore/services/dynamic/3dviewer" - ] + "description": "distinctive name for the node based on the docker registry path" }, "version": { "title": "Version", "pattern": "^(0|[1-9]\\d*)(\\.(0|[1-9]\\d*)){2}(-(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*)(\\.(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*))*)?(\\+[-\\da-zA-Z]+(\\.[-\\da-zA-Z-]+)*)?$", "type": "string", - "description": "service version number", - "example": [ - "1.0.0", - "0.0.1" - ] + "description": "service version number" }, "integration-version": { "title": "Integration-Version", "pattern": "^(0|[1-9]\\d*)(\\.(0|[1-9]\\d*)){2}(-(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*)(\\.(0|[1-9]\\d*|\\d*[-a-zA-Z][-\\da-zA-Z]*))*)?(\\+[-\\da-zA-Z]+(\\.[-\\da-zA-Z-]+)*)?$", "type": "string", - "description": "integration version number", - "example": "1.0.0" + "description": "integration version number" }, "type": { "allOf": [ @@ -1078,10 +1166,7 @@ "title": "Contact", "type": "string", "description": "email to correspond to the authors about the node", - "format": "email", - "example": [ - "lab@net.flix" - ] + "format": "email" }, "inputs": { "title": "Inputs", @@ -1094,7 +1179,8 @@ "description": "definition of the outputs of this node" } }, - "additionalProperties": false + "additionalProperties": false, + "description": "Service base schema (used for docker labels on docker images)" }, "ServiceExtras": { "title": "ServiceExtras", @@ -1139,11 +1225,7 @@ "displayOrder": { "title": "Displayorder", "type": "number", - "description": "use this to numerically sort the properties for display", - "example": [ - 1, - -0.2 - ] + "description": "use this to numerically sort the properties for display" }, "label": { "title": "Label", @@ -1161,31 +1243,12 @@ "title": "Type", "pattern": "^(number|integer|boolean|string|data:([^/\\s,]+/[^/\\s,]+|\\[[^/\\s,]+/[^/\\s,]+(,[^/\\s]+/[^/,\\s]+)*\\]))$", "type": "string", - "description": "data type expected on this input glob matching for data type is allowed", - "example": [ - "number", - "boolean", - "data:*/*", - "data:text/*", - "data:[image/jpeg,image/png]", - "data:application/json", - "data:application/json;schema=https://my-schema/not/really/schema.json", - "data:application/vnd.ms-excel", - "data:text/plain", - "data:application/hdf5", - "data:application/edu.ucdavis@ceclancy.xyz" - ] + "description": "data type expected on this input glob matching for data type is allowed" }, "fileToKeyMap": { "title": "Filetokeymap", "type": "object", - "description": "Place the data associated with the named keys in files", - "example": [ - { - "dir/input1.txt": "key_1", - "dir33/input2.txt": "key2" - } - ] + "description": "Place the data associated with the named keys in files" }, "defaultValue": { "title": "Defaultvalue", @@ -1199,10 +1262,6 @@ { "type": "integer" } - ], - "example": [ - "Dog", - true ] }, "widget": { @@ -1230,11 +1289,7 @@ "displayOrder": { "title": "Displayorder", "type": "number", - "description": "use this to numerically sort the properties for display", - "example": [ - 1, - -0.2 - ] + "description": "use this to numerically sort the properties for display" }, "label": { "title": "Label", @@ -1252,31 +1307,12 @@ "title": "Type", "pattern": "^(number|integer|boolean|string|data:([^/\\s,]+/[^/\\s,]+|\\[[^/\\s,]+/[^/\\s,]+(,[^/\\s]+/[^/,\\s]+)*\\]))$", "type": "string", - "description": "data type expected on this input glob matching for data type is allowed", - "example": [ - "number", - "boolean", - "data:*/*", - "data:text/*", - "data:[image/jpeg,image/png]", - "data:application/json", - "data:application/json;schema=https://my-schema/not/really/schema.json", - "data:application/vnd.ms-excel", - "data:text/plain", - "data:application/hdf5", - "data:application/edu.ucdavis@ceclancy.xyz" - ] + "description": "data type expected on this input glob matching for data type is allowed" }, "fileToKeyMap": { "title": "Filetokeymap", "type": "object", - "description": "Place the data associated with the named keys in files", - "example": [ - { - "dir/input1.txt": "key_1", - "dir33/input2.txt": "key2" - } - ] + "description": "Place the data associated with the named keys in files" }, "defaultValue": { "title": "Defaultvalue", @@ -1290,10 +1326,6 @@ { "type": "integer" } - ], - "example": [ - "Dog", - true ] }, "widget": { diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 102cc7e02b93..86124789aad5 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from models_library.projects import ProjectID from models_library.projects_state import RunningState +from simcore_service_director_v2.models.domains.comp_pipelines import CompPipelineAtDB from starlette import status from starlette.requests import Request from tenacity import ( @@ -32,7 +33,7 @@ is_pipeline_running, is_pipeline_stopped, ) -from ...utils.dags import create_dag_graph, find_entrypoints +from ...utils.dags import create_dag_graph, create_minimal_graph_based_on_selection from ...utils.exceptions import ProjectNotFoundError from ..dependencies.celery import CeleryClient, get_celery_client from ..dependencies.database import get_repository @@ -120,25 +121,26 @@ async def create_computation( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Project {job.project_id} is not a valid directed acyclic graph!", ) - - if job.start_pipeline: - # find the entrypoints, if not the pipeline cannot be started - entrypoints = find_entrypoints(dag_graph) - if not entrypoints: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Project {job.project_id} has no services to compute", - ) + # get a subgraph if needed + if job.subgraph: + dag_graph = create_minimal_graph_based_on_selection(dag_graph, job.subgraph) # ok so put the tasks in the db await computation_pipelines.upsert_pipeline( project.uuid, dag_graph, job.start_pipeline ) await computation_tasks.upsert_tasks_from_project( - project, director_client, job.start_pipeline + project, + director_client, + list(dag_graph.nodes()) if job.start_pipeline else [], ) if job.start_pipeline: + if not dag_graph.nodes(): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Project {job.project_id} has no computational services", + ) # trigger celery task = celery_client.send_computation_task(job.user_id, job.project_id) background_tasks.add_task(background_on_message, task) @@ -154,6 +156,7 @@ async def create_computation( state=RunningState.PUBLISHED if job.start_pipeline else RunningState.NOT_STARTED, + pipeline=nx.to_dict_of_lists(dag_graph), url=f"{request.url}/{job.project_id}", stop_url=f"{request.url}/{job.project_id}:stop" if job.start_pipeline @@ -175,6 +178,9 @@ async def get_computation( project_id: ProjectID, request: Request, project_repo: ProjectsRepository = Depends(get_repository(ProjectsRepository)), + computation_pipelines: CompPipelinesRepository = Depends( + get_repository(CompPipelinesRepository) + ), computation_tasks: CompTasksRepository = Depends( get_repository(CompTasksRepository) ), @@ -186,12 +192,22 @@ async def get_computation( # TODO: get a copy of the project and process it here instead! await project_repo.get_project(project_id) + # get the project pipeline + pipeline_at_db: CompPipelineAtDB = await computation_pipelines.get_pipeline( + project_id + ) + # get the project task states comp_tasks: List[CompTaskAtDB] = await computation_tasks.get_comp_tasks( project_id ) + dag_graph: nx.DiGraph = nx.from_dict_of_lists(pipeline_at_db.dag_adjacency_list) + # filter the tasks by the effective pipeline + filtered_tasks = [ + t for t in comp_tasks if str(t.node_id) in list(dag_graph.nodes()) + ] pipeline_state = get_pipeline_state_from_task_states( - comp_tasks, celery_client.settings.publication_timeout + filtered_tasks, celery_client.settings.publication_timeout ) log.debug( @@ -204,6 +220,7 @@ async def get_computation( task_out = ComputationTaskOut( id=project_id, state=pipeline_state, + pipeline=pipeline_at_db.dag_adjacency_list, url=f"{request.url.remove_query_params('user_id')}", stop_url=f"{request.url.remove_query_params('user_id')}:stop" if is_pipeline_running(pipeline_state) @@ -238,6 +255,9 @@ async def stop_computation_project( project_id: ProjectID, request: Request, project_repo: ProjectsRepository = Depends(get_repository(ProjectsRepository)), + computation_pipelines: CompPipelinesRepository = Depends( + get_repository(CompPipelinesRepository) + ), computation_tasks: CompTasksRepository = Depends( get_repository(CompTasksRepository) ), @@ -251,6 +271,10 @@ async def stop_computation_project( try: # get the project project: ProjectAtDB = await project_repo.get_project(project_id) + # get the project pipeline + pipeline_at_db: CompPipelineAtDB = await computation_pipelines.get_pipeline( + project_id + ) # check if current state allow to stop the computation comp_tasks: List[CompTaskAtDB] = await computation_tasks.get_comp_tasks( project_id @@ -266,6 +290,7 @@ async def stop_computation_project( return ComputationTaskOut( id=project_id, state=pipeline_state, + pipeline=pipeline_at_db.dag_adjacency_list, url=f"{str(request.url).rstrip(':stop')}", ) @@ -315,7 +340,7 @@ async def delete_pipeline( def return_last_value(retry_state): """return the result of the last call attempt""" return retry_state.outcome.result() - + @retry( stop=stop_after_delay(PIPELINE_ABORT_TIMEOUT_S), wait=wait_random(0, 2), diff --git a/services/director-v2/src/simcore_service_director_v2/models/domains/comp_pipelines.py b/services/director-v2/src/simcore_service_director_v2/models/domains/comp_pipelines.py index be14485a5a16..1f14ed3dd90a 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/domains/comp_pipelines.py +++ b/services/director-v2/src/simcore_service_director_v2/models/domains/comp_pipelines.py @@ -3,6 +3,9 @@ from models_library.projects import ProjectID from models_library.projects_state import RunningState from pydantic import BaseModel, validator +from simcore_postgres_database.models.comp_pipeline import StateType + +from ...utils.db import DB_TO_RUNNING_STATE class CompPipelineAtDB(BaseModel): @@ -10,6 +13,13 @@ class CompPipelineAtDB(BaseModel): dag_adjacency_list: Dict[str, List[str]] # json serialization issue if using NodeID state: RunningState + @validator("state", pre=True) + @classmethod + def convert_state_if_needed(cls, v): + if isinstance(v, StateType): + return RunningState(DB_TO_RUNNING_STATE[StateType(v)]) + return v + @validator("dag_adjacency_list", pre=True) @classmethod def auto_convert_dag(cls, v): diff --git a/services/director-v2/src/simcore_service_director_v2/models/domains/comp_tasks.py b/services/director-v2/src/simcore_service_director_v2/models/domains/comp_tasks.py index e95fe491664b..85eb6d3e0bab 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/domains/comp_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/models/domains/comp_tasks.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional +from typing import Dict, List, Optional from uuid import UUID from models_library.basic_regex import VERSION_RE @@ -11,6 +11,7 @@ from pydantic.types import PositiveInt from simcore_postgres_database.models.comp_tasks import NodeClass, StateType +from ...utils.db import DB_TO_RUNNING_STATE from ..schemas.constants import UserID TaskID = UUID @@ -22,6 +23,10 @@ class ComputationTaskCreate(BaseModel): start_pipeline: Optional[bool] = Field( False, description="if True the computation pipeline will start right away" ) + subgraph: Optional[List[NodeID]] = Field( + None, + description="An optional set of nodes that must be executed, if empty the whole pipeline is executed", + ) class ComputationTaskStop(BaseModel): @@ -41,6 +46,9 @@ class ComputationTask(BaseModel): result: Optional[str] = Field( None, description="the result of the computational task" ) + pipeline: Dict[NodeID, List[NodeID]] = Field( + ..., description="the corresponding pipeline in terms of node uuids" + ) class ComputationTaskOut(ComputationTask): @@ -52,17 +60,6 @@ class ComputationTaskOut(ComputationTask): ) -DB_TO_RUNNING_STATE = { - StateType.FAILED: RunningState.FAILED, - StateType.PENDING: RunningState.PENDING, - StateType.SUCCESS: RunningState.SUCCESS, - StateType.PUBLISHED: RunningState.PUBLISHED, - StateType.NOT_STARTED: RunningState.NOT_STARTED, - StateType.RUNNING: RunningState.STARTED, - StateType.ABORTED: RunningState.ABORTED, -} - - class Image(BaseModel): name: constr(regex=KEY_RE) tag: constr(regex=VERSION_RE) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py index 2d12056658ca..f81564141e79 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py @@ -2,10 +2,10 @@ import networkx as nx import sqlalchemy as sa -from sqlalchemy.dialects.postgresql import insert - +from aiopg.sa.result import RowProxy from models_library.projects import ProjectID from models_library.projects_state import RunningState +from sqlalchemy.dialects.postgresql import insert from ....models.domains.comp_pipelines import CompPipelineAtDB from ....utils.logging_utils import log_decorator @@ -16,6 +16,16 @@ class CompPipelinesRepository(BaseRepository): + @log_decorator(logger=logger) + async def get_pipeline(self, project_id: ProjectID) -> CompPipelineAtDB: + result = await self.connection.execute( + sa.select([comp_pipeline]).where( + comp_pipeline.c.project_id == str(project_id) + ) + ) + row: RowProxy = await result.fetchone() + return CompPipelineAtDB.from_orm(row) + @log_decorator(logger=logger) async def upsert_pipeline( self, project_id: ProjectID, dag_graph: nx.DiGraph, publish: bool diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py index 304ebede7506..48e18f14a88d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py @@ -5,6 +5,7 @@ import sqlalchemy as sa from models_library.projects import ProjectID from models_library.projects_nodes import Node +from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.services import ( Author, @@ -17,9 +18,9 @@ from ....models.domains.comp_tasks import CompTaskAtDB, Image, NodeSchema from ....models.domains.projects import ProjectAtDB from ....models.schemas.services import NodeRequirement, ServiceExtras +from ....utils.async_utils import run_sequentially_in_context from ....utils.computations import to_node_class from ....utils.logging_utils import log_decorator -from ....utils.async_utils import run_sequentially_in_context from ...director_v0 import DirectorV0Client from ..tables import NodeClass, StateType, comp_tasks from ._base import BaseRepository @@ -80,7 +81,7 @@ async def _sequentially_upsert_tasks_from_project( self, project: ProjectAtDB, director_client: DirectorV0Client, - publish: bool, + published_nodes: List[NodeID], str_project_uuid: str, ) -> None: # start by removing the old tasks if they exist @@ -123,7 +124,6 @@ async def _sequentially_upsert_tasks_from_project( requires_mpi=requires_mpi, ) - comp_state = RunningState.PUBLISHED if publish else RunningState.NOT_STARTED task_db = CompTaskAtDB( project_id=project.uuid, node_id=node_id, @@ -135,8 +135,9 @@ async def _sequentially_upsert_tasks_from_project( image=image, submit=datetime.utcnow(), state=( - comp_state - if node_class == NodeClass.COMPUTATIONAL + RunningState.PUBLISHED + if node_id in published_nodes + and node_class == NodeClass.COMPUTATIONAL else RunningState.NOT_STARTED ), internal_id=internal_id, @@ -151,7 +152,10 @@ async def _sequentially_upsert_tasks_from_project( @log_decorator(logger=logger) async def upsert_tasks_from_project( - self, project: ProjectAtDB, director_client: DirectorV0Client, publish: bool + self, + project: ProjectAtDB, + director_client: DirectorV0Client, + published_nodes: List[NodeID], ) -> None: # only used by the decorator on the "_sequentially_upsert_tasks_from_project" @@ -166,7 +170,7 @@ async def upsert_tasks_from_project( await self._sequentially_upsert_tasks_from_project( project=project, director_client=director_client, - publish=publish, + published_nodes=published_nodes, str_project_uuid=str_project_uuid, ) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/computations.py b/services/director-v2/src/simcore_service_director_v2/utils/computations.py index dd66adbb3cf5..9c09379ff2c7 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/computations.py @@ -3,8 +3,8 @@ from datetime import datetime from typing import List, Set -from models_library.services import SERVICE_KEY_RE from models_library.projects_state import RunningState +from models_library.services import SERVICE_KEY_RE from ..models.domains.comp_tasks import CompTaskAtDB from ..modules.db.tables import NodeClass diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dags.py b/services/director-v2/src/simcore_service_director_v2/utils/dags.py index ede18fb67fff..639b875b2c19 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dags.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dags.py @@ -1,27 +1,36 @@ import logging -from typing import List +from typing import List, Set import networkx as nx -from models_library.projects_nodes import NodeID from models_library.projects import Workbench +from models_library.projects_nodes import NodeID +from models_library.projects_nodes_io import PortLink -from .computations import to_node_class, NodeClass +from .computations import NodeClass, to_node_class +from .logging_utils import log_decorator -log = logging.getLogger(__file__) +logger = logging.getLogger(__file__) +@log_decorator(logger=logger) def find_entrypoints(graph: nx.DiGraph) -> List[NodeID]: entrypoints = [n for n in graph.nodes if not list(graph.predecessors(n))] - log.debug("the entrypoints of the graph are %s", entrypoints) + logger.debug("the entrypoints of the graph are %s", entrypoints) return entrypoints +@log_decorator(logger=logger) def create_dag_graph(workbench: Workbench) -> nx.DiGraph: dag_graph = nx.DiGraph() for node_id, node in workbench.items(): if to_node_class(node.key) == NodeClass.COMPUTATIONAL: dag_graph.add_node( - node_id, name=node.label, key=node.key, version=node.version + node_id, + name=node.label, + key=node.key, + version=node.version, + inputs=node.inputs, + outputs=node.outputs, ) for input_node_id in node.input_nodes: predecessor_node = workbench.get(str(input_node_id)) @@ -30,11 +39,64 @@ def create_dag_graph(workbench: Workbench) -> nx.DiGraph: and to_node_class(predecessor_node.key) == NodeClass.COMPUTATIONAL ): dag_graph.add_edge(str(input_node_id), node_id) - log.debug("created DAG graph: %s", nx.to_dict_of_lists(dag_graph)) return dag_graph +def mark_node_dirty(graph: nx.DiGraph, node_id: NodeID): + graph.nodes()[str(node_id)]["dirty"] = True + + +def is_node_dirty(graph: nx.DiGraph, node_id: NodeID) -> bool: + return graph.nodes()[str(node_id)].get("dirty", False) + + +def _node_outdated(full_dag_graph: nx.DiGraph, node_id: NodeID) -> bool: + node = full_dag_graph.nodes(data=True)[str(node_id)] + # if the node has no output it is outdated for sure + if not node["outputs"]: + return True + for output_port in node["outputs"]: + if output_port is None: + return True + # ok so we have outputs, but maybe the inputs are old? let's check recursively + for input_port in node["inputs"]: + if isinstance(input_port, PortLink): + if is_node_dirty(full_dag_graph, input_port.node_uuid): + return True + else: + # FIXME: here we should check if the current inputs are the ones used to generate the current outputs + # this could be done by saving the inputs as metadata together with the outputs (see blockchain) + # we should compare the current inputs with the inputs used for generating the current outputs!!! + pass + return False + + +@log_decorator(logger=logger) +def create_minimal_graph_based_on_selection( + full_dag_graph: nx.DiGraph, selected_nodes: List[NodeID] +) -> nx.DiGraph: + selected_nodes_str = [str(n) for n in selected_nodes] + # first pass, set the dirty attribute on the graph + for node in nx.topological_sort(full_dag_graph): + if node in selected_nodes_str or _node_outdated(full_dag_graph, node): + mark_node_dirty(full_dag_graph, node) + + # now we want all the outdated nodes that are in the tree from the selected nodes + minimal_selection_nodes: Set[NodeID] = set() + for node in selected_nodes: + minimal_selection_nodes.update( + set( + n + for n in nx.bfs_tree(full_dag_graph, str(node), reverse=True) + if is_node_dirty(full_dag_graph, n) + ) + ) + + return full_dag_graph.subgraph(minimal_selection_nodes) + + +@log_decorator(logger=logger) def topological_sort_grouping(dag_graph: nx.DiGraph) -> List: # copy the graph graph_copy = dag_graph.copy() diff --git a/services/director-v2/src/simcore_service_director_v2/utils/db.py b/services/director-v2/src/simcore_service_director_v2/utils/db.py new file mode 100644 index 000000000000..a72501fa3777 --- /dev/null +++ b/services/director-v2/src/simcore_service_director_v2/utils/db.py @@ -0,0 +1,12 @@ +from models_library.projects_state import RunningState +from simcore_postgres_database.models.comp_pipeline import StateType + +DB_TO_RUNNING_STATE = { + StateType.FAILED: RunningState.FAILED, + StateType.PENDING: RunningState.PENDING, + StateType.SUCCESS: RunningState.SUCCESS, + StateType.PUBLISHED: RunningState.PUBLISHED, + StateType.NOT_STARTED: RunningState.NOT_STARTED, + StateType.RUNNING: RunningState.STARTED, + StateType.ABORTED: RunningState.ABORTED, +} diff --git a/services/director-v2/tests/integration/test_computation_api.py b/services/director-v2/tests/integration/test_computation_api.py index a1a6543055f2..6e536e7886c2 100644 --- a/services/director-v2/tests/integration/test_computation_api.py +++ b/services/director-v2/tests/integration/test_computation_api.py @@ -10,6 +10,7 @@ import pytest import sqlalchemy as sa +from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.settings.rabbit import RabbitConfig from models_library.settings.redis import RedisConfig @@ -26,7 +27,7 @@ from yarl import URL core_services = ["director", "redis", "rabbit", "sidecar", "storage", "postgres"] -ops_services = ["minio"] +ops_services = ["minio", "adminer"] COMPUTATION_URL: str = "v2/computations" @@ -192,6 +193,107 @@ def check_pipeline_state() -> ComputationTaskOut: return task_out +@pytest.mark.parametrize( + "subgraph_elements,exp_pipeline_dag_adj_list_1st_run, exp_pipeline_dag_adj_list_2nd_run", + [ + pytest.param([0, 1], {0: [1], 1: []}, {0: [1], 1: []}, id="element 0,1"), + pytest.param( + [1, 2, 4], + {0: [1, 3], 1: [2], 2: [4], 3: [4], 4: []}, + {1: [4], 2: [], 4: []}, + id="element 1,2,4", + ), + ], +) +def test_run_partial_computation( + client: TestClient, + user_id: PositiveInt, + project: Callable, + sleepers_workbench: Dict, + subgraph_elements: List[int], + exp_pipeline_dag_adj_list_1st_run: Dict[str, List[str]], + exp_pipeline_dag_adj_list_2nd_run: Dict[str, List[str]], +): + # send a valid project with sleepers + sleepers_project = project(workbench=sleepers_workbench) + response = client.post( + COMPUTATION_URL, + json={ + "user_id": user_id, + "project_id": str(sleepers_project.uuid), + "start_pipeline": True, + "subgraph": [ + str(node_id) + for index, node_id in enumerate(sleepers_project.workbench) + if index in subgraph_elements + ], + }, + ) + assert ( + response.status_code == status.HTTP_201_CREATED + ), f"response code is {response.status_code}, error: {response.text}" + + task_out = ComputationTaskOut.parse_obj(response.json()) + + assert task_out.id == sleepers_project.uuid + assert task_out.state == RunningState.PUBLISHED + assert task_out.url == f"{client.base_url}/v2/computations/{sleepers_project.uuid}" + assert ( + task_out.stop_url + == f"{client.base_url}/v2/computations/{sleepers_project.uuid}:stop" + ) + # convert the ids to the node uuids from the project + workbench_node_uuids = list(sleepers_project.workbench.keys()) + expected_adj_list_1st_run: Dict[NodeID, List[NodeID]] = {} + expected_adj_list_2nd_run: Dict[NodeID, List[NodeID]] = {} + for conv in [expected_adj_list_1st_run, expected_adj_list_2nd_run]: + for node_key, next_nodes in exp_pipeline_dag_adj_list_1st_run.items(): + conv[NodeID(workbench_node_uuids[node_key])] = [ + NodeID(workbench_node_uuids[n]) for n in next_nodes + ] + assert task_out.pipeline == expected_adj_list_1st_run + + # now wait for the computation to finish + task_out = _assert_pipeline_status( + client, task_out.url, user_id, sleepers_project.uuid + ) + assert task_out.url == f"{client.base_url}/v2/computations/{sleepers_project.uuid}" + assert task_out.stop_url == None + + assert ( + task_out.state == RunningState.SUCCESS + ), f"the pipeline complete with state {task_out.state}" + + # run it a second time. the expected tasks to run might change. + response = client.post( + COMPUTATION_URL, + json={ + "user_id": user_id, + "project_id": str(sleepers_project.uuid), + "start_pipeline": True, + "subgraph": [ + str(node_id) + for index, node_id in enumerate(sleepers_project.workbench) + if index in subgraph_elements + ], + }, + ) + assert ( + response.status_code == status.HTTP_201_CREATED + ), f"response code is {response.status_code}, error: {response.text}" + + task_out = ComputationTaskOut.parse_obj(response.json()) + + assert task_out.id == sleepers_project.uuid + assert task_out.state == RunningState.PUBLISHED + assert task_out.url == f"{client.base_url}/v2/computations/{sleepers_project.uuid}" + assert ( + task_out.stop_url + == f"{client.base_url}/v2/computations/{sleepers_project.uuid}:stop" + ) + assert task_out.pipeline == expected_adj_list_2nd_run + + def test_run_computation( client: TestClient, user_id: PositiveInt, diff --git a/services/director-v2/tests/unit/test_dags.py b/services/director-v2/tests/unit/test_dags.py index e8f02e9b4083..4d72a79d6fb4 100644 --- a/services/director-v2/tests/unit/test_dags.py +++ b/services/director-v2/tests/unit/test_dags.py @@ -7,13 +7,16 @@ import json from pathlib import Path -from typing import Dict +from typing import Dict, Set import networkx as nx import pytest -from models_library.projects_nodes import Node from models_library.projects import Workbench -from simcore_service_director_v2.utils.dags import create_dag_graph +from models_library.projects_nodes import Node +from simcore_service_director_v2.utils.dags import ( + create_dag_graph, + create_minimal_graph_based_on_selection, +) @pytest.fixture(scope="session") @@ -39,3 +42,74 @@ def sleepers_workbench_adjacency(sleepers_workbench_adjacency_file: Path) -> Dic def test_create_dags(workbench: Workbench, sleepers_workbench_adjacency: Dict): dag: nx.DiGraph = create_dag_graph(workbench) assert nx.to_dict_of_lists(dag) == sleepers_workbench_adjacency + + +@pytest.mark.parametrize( + "subgraph, exp_dag", + [ + pytest.param( + {}, + {}, + id="no nodes", + ), + pytest.param( + { + "8902d36c-bc65-5b0d-848f-88aed72d7849", # sleeper 0 + "3a710d8b-565c-5f46-870b-b45ebe195fc7", # sleeper 1 + }, + { + "8902d36c-bc65-5b0d-848f-88aed72d7849": [ + "3a710d8b-565c-5f46-870b-b45ebe195fc7" + ], + "3a710d8b-565c-5f46-870b-b45ebe195fc7": [], + }, + id="nodes 0 and 1", + ), + pytest.param( + { + "8902d36c-bc65-5b0d-848f-88aed72d7849", # sleeper 0 + "415fefd1-d08b-53c1-adb0-16bed3a687ef", # sleeper 2 + }, + { + "8902d36c-bc65-5b0d-848f-88aed72d7849": [ + "3a710d8b-565c-5f46-870b-b45ebe195fc7" + ], + "3a710d8b-565c-5f46-870b-b45ebe195fc7": [ + "415fefd1-d08b-53c1-adb0-16bed3a687ef" + ], + "415fefd1-d08b-53c1-adb0-16bed3a687ef": [], + }, + id="node 0 and 2", + ), + pytest.param( + { + "8902d36c-bc65-5b0d-848f-88aed72d7849", # sleeper 0 + "415fefd1-d08b-53c1-adb0-16bed3a687ef", # sleeper 2 + "6ede1209-b459-5735-91fc-761aa584808d", # sleeper 4 + }, + { + "8902d36c-bc65-5b0d-848f-88aed72d7849": [ + "3a710d8b-565c-5f46-870b-b45ebe195fc7", + "e1e2ea96-ce8f-5abc-8712-b8ed312a782c", + ], + "3a710d8b-565c-5f46-870b-b45ebe195fc7": [ + "415fefd1-d08b-53c1-adb0-16bed3a687ef" + ], + "415fefd1-d08b-53c1-adb0-16bed3a687ef": [ + "6ede1209-b459-5735-91fc-761aa584808d" + ], + "e1e2ea96-ce8f-5abc-8712-b8ed312a782c": [ + "6ede1209-b459-5735-91fc-761aa584808d" + ], + "6ede1209-b459-5735-91fc-761aa584808d": [], + }, + id="node 0, 2 and 4", + ), + ], +) +def test_create_minimal_graph(workbench: Workbench, subgraph: Set[str], exp_dag): + full_dag_graph: nx.DiGraph = create_dag_graph(workbench) + reduced_dag: nx.DiGraph = create_minimal_graph_based_on_selection( + full_dag_graph, subgraph + ) + assert nx.to_dict_of_lists(reduced_dag) == exp_dag diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index c04a46b6f27c..bc162a867c93 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -62,7 +62,6 @@ services: redis-commander: image: rediscommander/redis-commander:latest init: true - restart: always environment: - REDIS_HOSTS=${REDIS_HOST} ports: @@ -75,7 +74,6 @@ services: # SEE https://github.com/mher/flower/issues/1029 image: mher/flower:0.9.5 init: true - restart: always environment: - CELERY_BROKER_URL=amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT} - FLOWER_BROKER_API=http://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:15672/api/ diff --git a/services/sidecar/src/simcore_service_sidecar/celery.py b/services/sidecar/src/simcore_service_sidecar/celery.py index ab3eb19697e8..b59766235bc3 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery.py +++ b/services/sidecar/src/simcore_service_sidecar/celery.py @@ -1,7 +1,9 @@ +import logging + from celery.signals import worker_ready, worker_shutting_down +from .__version__ import __version__ from .celery_configurator import create_celery_app -from .celery_log_setup import get_task_logger from .celery_task_utils import cancel_task from .cli import run_sidecar from .remote_debug import setup_remote_debugging @@ -10,7 +12,21 @@ app = create_celery_app() -log = get_task_logger(__name__) +log = logging.getLogger(__name__) + +WELCOME_MSG = r""" + .-') _ .-') _ ('-. ('-. _ .-') + ( OO ). ( ( OO) ) _( OO) ( OO ).-.( \( -O ) +(_)---\_) ,-.-') \ .'_ (,------. .-----. / . --. / ,------. +/ _ | | |OO),`'--..._) | .---' ' .--./ | \-. \ | /`. ' +\ :` `. | | \| | \ ' | | | |('-..-'-' | | | / | | + '..`''.) | |(_/| | ' |(| '--. /_) |OO )\| |_.' | | |_.' | +.-._) \ ,| |_.'| | / : | .--' || |`-'| | .-. | | . '.' +\ /(_| | | '--' / | `---.(_' '--'\ | | | | | |\ \ + `-----' `--' `-------' `------' `-----' `--' `--' `--' '--' {0} +""".format( + __version__ +) @worker_shutting_down.connect @@ -28,7 +44,7 @@ def worker_shutting_down_handler( @worker_ready.connect def worker_ready_handler(*args, **kwargs): # pylint: disable=unused-argument - log.info("!!!!!!!!!!!!!! Worker is READY now !!!!!!!!!!!!!!!!!!") + print(WELCOME_MSG, flush=True) __all__ = ["app"] diff --git a/services/sidecar/src/simcore_service_sidecar/celery_configurator.py b/services/sidecar/src/simcore_service_sidecar/celery_configurator.py index eec96e9a537f..61ff37348fc8 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery_configurator.py +++ b/services/sidecar/src/simcore_service_sidecar/celery_configurator.py @@ -7,6 +7,7 @@ use a look ahead function to check the type of upcoming task and schedule it accordingly. """ +import logging from functools import wraps from typing import Callable @@ -16,12 +17,11 @@ from . import config from .boot_mode import BootMode, get_boot_mode, set_boot_mode -from .celery_log_setup import get_task_logger from .celery_task import entrypoint from .celery_task_utils import on_task_failure_handler, on_task_success_handler from .utils import is_gpu_node, start_as_mpi_node -log = get_task_logger(__name__) +log = logging.getLogger(__name__) CELERY_APP_CONFIGS = { @@ -65,7 +65,7 @@ def define_celery_task(app: Celery, name: str) -> None: def configure_node(bootmode: BootMode) -> Celery: - log.info("Initializing celery app in %s...", bootmode) + log.debug("Initializing celery app in %s...", bootmode) app = Celery( f"sidecar.{str(bootmode.name).lower()}.{config.SIDECAR_HOST_HOSTNAME_PATH.read_text()}", broker=config.CELERY_CONFIG.broker_url, diff --git a/services/sidecar/src/simcore_service_sidecar/celery_log_setup.py b/services/sidecar/src/simcore_service_sidecar/celery_log_setup.py index 79583fdb9759..2147ceb97ae9 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery_log_setup.py +++ b/services/sidecar/src/simcore_service_sidecar/celery_log_setup.py @@ -1,9 +1,8 @@ """ setup logging formatters to fit logspout's multiline pattern "^(ERROR|WARNING|INFO|DEBUG|CRITICAL)[:]" - NOTE: import to connect signals! - SEE https://github.com/ITISFoundation/osparc-ops/blob/master/services/graylog/docker-compose.yml#L113 """ + # NOTES: # https://docs.celeryproject.org/en/latest/userguide/signals.html#setup-logging # https://www.distributedpython.com/2018/08/28/celery-logging/ @@ -11,7 +10,11 @@ from celery.app.log import TaskFormatter from celery.signals import after_setup_logger, after_setup_task_logger from celery.utils.log import get_task_logger -from servicelib.logging_utils import CustomFormatter, set_logging_handler +from servicelib.logging_utils import ( + CustomFormatter, + set_logging_handler, + config_all_loggers, +) @after_setup_logger.connect @@ -27,6 +30,7 @@ class TaskColoredFormatter(TaskFormatter, CustomFormatter): @after_setup_task_logger.connect def setup_task_logger(logger, *_args, **_kwargs): """ Customizes task loggers """ + set_logging_handler( logger, formatter_base=TaskColoredFormatter, @@ -34,8 +38,6 @@ def setup_task_logger(logger, *_args, **_kwargs): ) -# TODO: configure via command line or config file. Add in config.yaml +config_all_loggers() log = get_task_logger(__name__) log.info("Setting up loggers") - -__all__ = ["get_task_logger"] diff --git a/services/sidecar/src/simcore_service_sidecar/celery_task.py b/services/sidecar/src/simcore_service_sidecar/celery_task.py index 4735cf9b5fb6..95a0943beaeb 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery_task.py +++ b/services/sidecar/src/simcore_service_sidecar/celery_task.py @@ -1,16 +1,16 @@ +import logging from asyncio import CancelledError from pprint import pformat from typing import Optional from celery import Celery, states -from .celery_log_setup import get_task_logger from .cli import run_sidecar from .config import CPU_QUEUE_NAME, GPU_QUEUE_NAME, MPI_QUEUE_NAME from .core import task_required_resources from .utils import wrap_async_call -log = get_task_logger(__name__) +log = logging.getLogger(__name__) def entrypoint( diff --git a/services/sidecar/src/simcore_service_sidecar/celery_task_utils.py b/services/sidecar/src/simcore_service_sidecar/celery_task_utils.py index 3b86a19c5604..c3f42cbc0ac5 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery_task_utils.py +++ b/services/sidecar/src/simcore_service_sidecar/celery_task_utils.py @@ -1,10 +1,9 @@ import asyncio +import logging from pprint import pformat from typing import Callable -from .celery_log_setup import get_task_logger - -log = get_task_logger(__name__) +log = logging.getLogger(__name__) def on_task_failure_handler( diff --git a/services/sidecar/src/simcore_service_sidecar/cli.py b/services/sidecar/src/simcore_service_sidecar/cli.py index a06e1b3ddcb1..8f9cf05783e1 100644 --- a/services/sidecar/src/simcore_service_sidecar/cli.py +++ b/services/sidecar/src/simcore_service_sidecar/cli.py @@ -3,6 +3,7 @@ from typing import Callable, List, Optional import click +from servicelib.logging_utils import log_decorator from .celery_task_utils import cancel_task from .config import SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S @@ -32,14 +33,20 @@ def main( log.exception("Uncaught exception") +@log_decorator(logger=log, level=logging.INFO) async def perdiodicaly_check_if_aborted(is_aborted_cb: Callable[[], bool]) -> None: - log.info("Starting periodic check of task abortion...") - while await asyncio.sleep(SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S, result=True): - if is_aborted_cb(): - log.info("Task was aborted. Cancelling...") - asyncio.get_event_loop().call_soon(cancel_task(run_sidecar)) + try: + while await asyncio.sleep( + SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S, result=True + ): + if is_aborted_cb(): + log.info("Task was aborted. Cancelling...") + asyncio.get_event_loop().call_soon(cancel_task(run_sidecar)) + except asyncio.CancelledError: + pass +@log_decorator(logger=log, level=logging.INFO) async def run_sidecar( job_id: str, user_id: str, @@ -47,15 +54,6 @@ async def run_sidecar( node_id: Optional[str] = None, is_aborted_cb: Optional[Callable[[], bool]] = None, ) -> Optional[List[str]]: - - log.info( - "STARTING task %s processing for user %s, project %s, node %s", - job_id, - user_id, - project_id, - node_id, - ) - abortion_task = ( asyncio.get_event_loop().create_task( perdiodicaly_check_if_aborted(is_aborted_cb) @@ -69,15 +67,7 @@ async def run_sidecar( next_task_nodes: Optional[List[str]] = await inspect( db_engine, rabbit_mq, job_id, user_id, project_id, node_id=node_id ) - log.info( - "COMPLETED task %s processing for user %s, project %s, node %s", - job_id, - user_id, - project_id, - node_id, - ) return next_task_nodes - except asyncio.CancelledError: + finally: if abortion_task: abortion_task.cancel() - raise diff --git a/services/sidecar/src/simcore_service_sidecar/core.py b/services/sidecar/src/simcore_service_sidecar/core.py index 5cf2a2661945..c388202285d6 100644 --- a/services/sidecar/src/simcore_service_sidecar/core.py +++ b/services/sidecar/src/simcore_service_sidecar/core.py @@ -1,4 +1,5 @@ import asyncio +import logging import traceback from datetime import datetime from typing import Dict, List, Optional, Union @@ -6,7 +7,6 @@ import networkx as nx from aiopg.sa import Engine, SAConnection from aiopg.sa.result import RowProxy -from celery.utils.log import get_task_logger from simcore_postgres_database.sidecar_models import ( StateType, comp_pipeline, @@ -22,7 +22,7 @@ from .rabbitmq import RabbitMQ from .utils import execution_graph, find_entry_point, is_node_ready -log = get_task_logger(__name__) +log = logging.getLogger(__name__) log.setLevel(config.SIDECAR_LOGLEVEL) node_port_v2_log.setLevel(config.SIDECAR_LOGLEVEL) @@ -85,7 +85,7 @@ async def _try_get_task_from_db( return # Check if node's dependecies are there - if not await is_node_ready(task, graph, db_connection, log): + if not await is_node_ready(task, graph, db_connection): log.debug("TASK %s NOT YET READY", task.internal_id) return diff --git a/services/sidecar/src/simcore_service_sidecar/executor.py b/services/sidecar/src/simcore_service_sidecar/executor.py index a74d3354592f..8d3775e780ac 100644 --- a/services/sidecar/src/simcore_service_sidecar/executor.py +++ b/services/sidecar/src/simcore_service_sidecar/executor.py @@ -13,8 +13,8 @@ from aiodocker import Docker from aiodocker.containers import DockerContainer from aiodocker.exceptions import DockerContainerError, DockerError -from celery.utils.log import get_task_logger from packaging import version +from servicelib.logging_utils import log_decorator from servicelib.utils import fire_and_forget_task, logged_gather from simcore_sdk import node_data, node_ports_v2 from simcore_sdk.node_ports_v2 import DBManager @@ -25,7 +25,7 @@ from .rabbitmq import RabbitMQ from .utils import get_volume_mount_point -log = get_task_logger(__name__) +log = logging.getLogger(__name__) @attr.s(auto_attribs=True) @@ -81,15 +81,8 @@ class Executor: shared_folders: TaskSharedVolumes = None integration_version: version.Version = version.parse("0.0.0") + @log_decorator(logger=log) async def run(self): - log.debug( - "Running %s project:%s node:%s internal_id:%s from container", - self.task.image["name"], - self.task.project_id, - self.task.node_id, - self.task.internal_id, - ) - try: await self.preprocess() await self.process() @@ -110,37 +103,33 @@ async def run(self): finally: await self.cleanup() + @log_decorator(logger=log) async def preprocess(self): await self._post_messages(LogType.LOG, "[sidecar]Preprocessing...") - log.debug("Pre-Processing...") self.shared_folders = TaskSharedVolumes.from_task(self.task) self.shared_folders.create() host_name = config.SIDECAR_HOST_HOSTNAME_PATH.read_text() await self._post_messages(LogType.LOG, f"[sidecar]Running on {host_name}") results = await logged_gather(self._process_task_inputs(), self._pull_image()) await self._write_input_file(results[0]) - log.debug("Pre-Processing Pipeline DONE") + @log_decorator(logger=log) async def process(self): - log.debug("Processing...") await self._post_messages(LogType.LOG, "[sidecar]Processing...") await self._run_container() - log.debug("Processing DONE") + @log_decorator(logger=log) async def postprocess(self): - log.debug("Post-Processing...") await self._post_messages(LogType.LOG, "[sidecar]Postprocessing...") await self._process_task_output() await self._process_task_log() - log.debug("Post-Processing DONE") + @log_decorator(logger=log) async def cleanup(self): - log.debug("Cleaning...") await self._post_messages(LogType.LOG, "[sidecar]Cleaning...") if self.shared_folders: self.shared_folders.delete() await self._post_messages(LogType.LOG, "[sidecar]Cleaning completed") - log.debug("Cleaning DONE") async def _get_node_ports(self): if self.db_manager is None: @@ -148,8 +137,8 @@ async def _get_node_ports(self): self.db_manager = DBManager(self.db_engine) return await node_ports_v2.ports(self.db_manager) + @log_decorator(logger=log) async def _process_task_input(self, port: node_ports_v2.Port, input_ports: Dict): - log.debug("getting value from node ports...") port_value = await port.get() input_ports[port.key] = port_value log.debug("PROCESSING %s [%s]: %s", port.key, type(port_value), port_value) @@ -177,9 +166,8 @@ async def _process_task_input(self, port: node_ports_v2.Port, input_ports: Dict) # finally remove the zip archive os.remove(final_path) + @log_decorator(logger=log) async def _process_task_inputs(self) -> Dict: - log.debug("Inputs parsing...") - input_ports: Dict = {} try: PORTS = await self._get_node_ports() @@ -199,12 +187,11 @@ async def _process_task_inputs(self) -> Dict: for port in (await PORTS.inputs).values() ] ) - log.debug("Inputs parsing DONE") return input_ports + @log_decorator(logger=log) async def _write_input_file(self, inputs: Dict) -> None: if inputs: - log.debug("Writing input file...") stem = ( "input" if self.integration_version == version.parse("0.0.0") @@ -212,16 +199,10 @@ async def _write_input_file(self, inputs: Dict) -> None: ) file_name = self.shared_folders.input_folder / f"{stem}.json" file_name.write_text(json.dumps(inputs)) - log.debug("Writing input file DONE") + @log_decorator(logger=log) async def _pull_image(self): docker_image = f"{config.DOCKER_REGISTRY}/{self.task.image['name']}:{self.task.image['tag']}" - log.debug( - "PULLING IMAGE %s as %s with pwd %s", - docker_image, - config.DOCKER_USER, - config.DOCKER_PASSWORD, - ) async with Docker() as docker_client: await self._post_messages( LogType.LOG, @@ -245,6 +226,7 @@ async def _pull_image(self): )["integration-version"] ) + @log_decorator(logger=log) async def _create_container_config(self, docker_image: str) -> Dict: # NOTE: Env/Binds for log folder is only necessary for integraion "0" env_vars = [ @@ -292,6 +274,7 @@ async def _create_container_config(self, docker_image: str) -> Dict: } return docker_container_config + @log_decorator(logger=log) async def _start_monitoring_container( self, container: DockerContainer ) -> asyncio.Future: @@ -310,6 +293,7 @@ async def _start_monitoring_container( return log_processor_task # pylint: disable=too-many-statements + @log_decorator(logger=log) async def _run_container(self): start_time = time.perf_counter() docker_image = f"{config.DOCKER_REGISTRY}/{self.task.image['name']}:{self.task.image['tag']}" @@ -420,6 +404,7 @@ async def _run_container(self): } ) + @log_decorator(logger=log) async def _process_task_output(self): """There will be some files in the /output @@ -429,7 +414,6 @@ async def _process_task_output(self): Files will be pushed to S3 with reference in db. output.json will be parsed and the db updated """ - log.debug("Processing outputs...") await self._post_messages( LogType.LOG, "[sidecar]Uploading outputs...", @@ -474,10 +458,9 @@ async def _process_task_output(self): ) except (OSError, IOError): await self._error_message_to_ui_and_logs("Could not process output") - log.debug("Processing outputs DONE") + @log_decorator(logger=log) async def _process_task_log(self): - log.debug("Processing Logs...") await self._post_messages( LogType.LOG, "[sidecar]Uploading logs...", @@ -486,7 +469,6 @@ async def _process_task_log(self): await node_data.data_manager.push( self.shared_folders.log_folder, rename_to="logs" ) - log.debug("Processing Logs DONE") async def _post_messages(self, log_type: LogType, message: str): if log_type == LogType.LOG: diff --git a/services/sidecar/src/simcore_service_sidecar/log_parser.py b/services/sidecar/src/simcore_service_sidecar/log_parser.py index 3c7f820b02fa..a9007cf9fbe8 100644 --- a/services/sidecar/src/simcore_service_sidecar/log_parser.py +++ b/services/sidecar/src/simcore_service_sidecar/log_parser.py @@ -10,6 +10,7 @@ from aiodocker.containers import DockerContainer from aiodocker.exceptions import DockerError from aiofile import AIOFile, Writer +from servicelib.logging_utils import log_decorator from . import exceptions @@ -59,6 +60,7 @@ async def parse_line(line: str) -> Tuple[LogType, str]: return (LogType.LOG, f"[task] {line}") +@log_decorator(logger=log) async def monitor_logs_task( mon_log_file_or_container: Union[Path, DockerContainer], log_cb: Awaitable[Callable[[LogType, str], None]], @@ -66,10 +68,8 @@ async def monitor_logs_task( ) -> None: try: if isinstance(mon_log_file_or_container, Path): - log.debug("start monitoring log in %s", mon_log_file_or_container) await _monitor_log_file(mon_log_file_or_container, log_cb) elif isinstance(mon_log_file_or_container, DockerContainer): - log.debug("start monitoring docker logs of %s", mon_log_file_or_container) await _monitor_docker_container( mon_log_file_or_container, log_cb, out_log_file ) @@ -77,10 +77,10 @@ async def monitor_logs_task( raise exceptions.SidecarException("Invalid log type") except asyncio.CancelledError: - # user cancels - log.debug("stop monitoring logs in") + pass +@log_decorator(logger=log) async def _monitor_docker_container( container: DockerContainer, log_cb: Awaitable[Callable[[LogType, str], None]], @@ -110,6 +110,7 @@ async def _monitor_docker_container( log_file.unlink() +@log_decorator(logger=log) async def _monitor_log_file( log_file, log_cb: Awaitable[Callable[[LogType, str], None]] ) -> None: @@ -122,7 +123,6 @@ async def _monitor_log_file( if not line: asyncio.sleep(1) continue - log.debug("log monitoring: found log %s", line) log_type, parsed_line = await parse_line(line) await log_cb(log_type, parsed_line) diff --git a/services/sidecar/src/simcore_service_sidecar/rabbitmq.py b/services/sidecar/src/simcore_service_sidecar/rabbitmq.py index 1576c0b14b64..c23db3518a59 100644 --- a/services/sidecar/src/simcore_service_sidecar/rabbitmq.py +++ b/services/sidecar/src/simcore_service_sidecar/rabbitmq.py @@ -8,6 +8,7 @@ import tenacity from models_library.settings.celery import CeleryConfig from pydantic import BaseModel # pylint: disable=no-name-in-module +from servicelib.logging_utils import log_decorator from servicelib.rabbitmq_utils import RabbitMQRetryPolicyUponInitialization from . import config @@ -49,6 +50,7 @@ class Config: # see https://pydantic-docs.helpmanual.io/usage/types/#arbitrary-types-allowed arbitrary_types_allowed = True + @log_decorator(logger=log) async def connect(self): if not self.celery_config: self.celery_config = config.CELERY_CONFIG @@ -81,16 +83,15 @@ async def connect(self): aio_pika.ExchangeType.FANOUT, ) + @log_decorator(logger=log) async def close(self): - log.debug("Closing channel...") await self.channel.close() - log.debug("Closing connection...") await self.connection.close() + @log_decorator(logger=log) async def _post_message( self, exchange: aio_pika.Exchange, data: Dict[str, Union[str, Any]] ): - log.debug("publishing message to the broker %s", data) await exchange.publish( aio_pika.Message(body=json.dumps(data).encode()), routing_key="" ) diff --git a/services/sidecar/src/simcore_service_sidecar/utils.py b/services/sidecar/src/simcore_service_sidecar/utils.py index e7bc9c55f2b4..3bd91ecf64df 100644 --- a/services/sidecar/src/simcore_service_sidecar/utils.py +++ b/services/sidecar/src/simcore_service_sidecar/utils.py @@ -8,6 +8,7 @@ from aiodocker.volumes import DockerVolume from aiopg.sa import SAConnection from aiopg.sa.result import RowProxy +from servicelib.logging_utils import log_decorator from simcore_postgres_database.sidecar_models import StateType, comp_tasks from sqlalchemy import and_ @@ -30,11 +31,11 @@ def find_entry_point(g: nx.DiGraph) -> List: return result +@log_decorator(logger=logger) async def is_node_ready( task: RowProxy, graph: nx.DiGraph, db_connection: SAConnection, - _logger: logging.Logger, ) -> bool: query = comp_tasks.select().where( and_( @@ -45,12 +46,12 @@ async def is_node_ready( result = await db_connection.execute(query) tasks = await result.fetchall() - _logger.debug("TASK %s ready? Checking ..", task.internal_id) + logger.debug("TASK %s ready? Checking ..", task.internal_id) for dep_task in tasks: job_id = dep_task.job_id if not job_id: return False - _logger.debug( + logger.debug( "TASK %s DEPENDS ON %s with stat %s", task.internal_id, dep_task.internal_id, @@ -58,7 +59,6 @@ async def is_node_ready( ) if not dep_task.state == StateType.SUCCESS: return False - _logger.debug("TASK %s is ready", task.internal_id) return True @@ -79,6 +79,7 @@ def is_gpu_node() -> bool: """Returns True if this node has support to GPU, meaning that the `VRAM` label was added to it.""" + @log_decorator(logger=logger, level=logging.INFO) async def async_is_gpu_node() -> bool: async with aiodocker.Docker() as docker: spec_config = { @@ -109,7 +110,6 @@ async def async_is_gpu_node() -> bool: return False has_gpu = wrap_async_call(async_is_gpu_node()) - logger.info("Node gpus support result %s", has_gpu) return has_gpu @@ -133,6 +133,7 @@ def start_as_mpi_node() -> bool: return is_mpi_node +@log_decorator(logger=logger) async def get_volume_mount_point(volume_name: str) -> str: try: async with aiodocker.Docker() as docker_client: diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 7e1f645bcecb..584c7b405c87 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -5515,9 +5515,23 @@ paths: schema: type: string example: 123e4567-e89b-12d3-a456-426655440000 + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + subgraph: + description: The node uuids selected for running a partial pipeline + type: array + uniqueItems: true + items: + type: string + format: uuid responses: '201': - description: Succesffully started the pipeline + description: Successfully started the pipeline content: application/json: schema: diff --git a/services/web/server/src/simcore_service_webserver/computation_api.py b/services/web/server/src/simcore_service_webserver/computation_api.py index fa96c9850b32..7ac33c65981e 100644 --- a/services/web/server/src/simcore_service_webserver/computation_api.py +++ b/services/web/server/src/simcore_service_webserver/computation_api.py @@ -6,12 +6,9 @@ from aiohttp import web from celery import Celery - from models_library.projects_state import RunningState -from servicelib.logging_utils import log_decorator from simcore_postgres_database.models.comp_pipeline import StateType - from .computation_config import ComputationSettings from .computation_config import get_settings as get_computation_settings @@ -44,6 +41,5 @@ def get_celery(app: web.Application) -> Celery: } -@log_decorator(logger=log) def convert_state_from_db(db_state: StateType) -> RunningState: return RunningState(DB_TO_RUNNING_STATE[StateType(db_state)]) diff --git a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py index 4d128fd3b605..d1302c0143be 100644 --- a/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py +++ b/services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py @@ -104,11 +104,10 @@ async def listen(app: web.Application): if not task_changes: log.error("no changes but still triggered: %s", pformat(payload)) + continue project_uuid = task_data.get("project_id", None) node_uuid = task_data.get("node_id", None) - outputs = task_data.get("outputs", {}) - state = convert_state_from_db(task_data.get("state")).value # FIXME: we do not know who triggered these changes. we assume the user had the rights to do so # therefore we'll use the prj_owner user id. This should be fixed when the new sidecar comes in @@ -118,13 +117,15 @@ async def listen(app: web.Application): the_project_owner = await _get_project_owner(conn, project_uuid) if "outputs" in task_changes: + new_outputs = task_data.get("outputs", {}) await _update_project_outputs( - app, the_project_owner, project_uuid, node_uuid, outputs + app, the_project_owner, project_uuid, node_uuid, new_outputs ) if "state" in task_changes: + new_state = convert_state_from_db(task_data["state"]).value await _update_project_state( - app, the_project_owner, project_uuid, node_uuid, state + app, the_project_owner, project_uuid, node_uuid, new_state ) except projects_exceptions.ProjectNotFoundError as exc: diff --git a/services/web/server/src/simcore_service_webserver/director_v2.py b/services/web/server/src/simcore_service_webserver/director_v2.py index 9a82daa71869..2feca92f1fdf 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2.py +++ b/services/web/server/src/simcore_service_webserver/director_v2.py @@ -1,16 +1,17 @@ import logging from asyncio import CancelledError -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple from uuid import UUID from aiohttp import web -from models_library.projects_state import RunningState from pydantic.types import PositiveInt +from yarl import URL + +from models_library.projects_state import RunningState from servicelib.application_setup import ModuleCategory, app_module_setup from servicelib.logging_utils import log_decorator from servicelib.rest_responses import wrap_as_envelope from servicelib.rest_routing import iter_path_operations, map_handlers_with_operations -from yarl import URL from .director_v2_settings import ( CONFIG_SECTION_NAME, @@ -122,10 +123,19 @@ async def start_pipeline(request: web.Request) -> web.Response: user_id = request[RQT_USERID_KEY] project_id = request.match_info.get("project_id", None) + subgraph: Set[str] = set() + if request.can_read_body: + body = await request.json() + subgraph = body.get("subgraph") backend_url = URL(f"{director2_settings.endpoint}/computations") log.debug("Redirecting '%s' -> '%s'", request.url, backend_url) - body = {"user_id": user_id, "project_id": project_id, "start_pipeline": True} + body = { + "user_id": user_id, + "project_id": project_id, + "start_pipeline": True, + "subgraph": list(subgraph), # sets are not natively json serializable + } # request to director-v2 try: diff --git a/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py b/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py index 6365d801266b..f9c8d77649c8 100644 --- a/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py +++ b/services/web/server/tests/unit/with_dbs/fast/test_comp_tasks_listening_task.py @@ -92,21 +92,27 @@ async def _wait_for_call(mock_fct): @pytest.mark.parametrize( "upd_value, exp_calls", [ - ( - {"outputs": {"some new stuff": "it is new"}}, + pytest.param( + { + "outputs": {"some new stuff": "it is new"}, + }, ["_get_project_owner", "_update_project_outputs"], + id="new output shall trigger", ), - ( + pytest.param( {"state": StateType.ABORTED}, ["_get_project_owner", "_update_project_state"], + id="new state shall trigger", ), - ( + pytest.param( {"outputs": {"some new stuff": "it is new"}, "state": StateType.ABORTED}, ["_get_project_owner", "_update_project_outputs", "_update_project_state"], + id="new output and state shall double trigger", ), - ( + pytest.param( {"inputs": {"should not trigger": "right?"}}, [], + id="no new outpuot or state shall not trigger", ), ], ) diff --git a/services/web/server/tests/unit/with_dbs/fast/test_director_v2.py b/services/web/server/tests/unit/with_dbs/fast/test_director_v2.py index 76482506db6d..70b70735f8c6 100644 --- a/services/web/server/tests/unit/with_dbs/fast/test_director_v2.py +++ b/services/web/server/tests/unit/with_dbs/fast/test_director_v2.py @@ -63,7 +63,34 @@ async def test_start_pipeline( expected: ExpectedResponse, ): url = client.app.router["start_pipeline"].url_for(project_id=f"{project_id}") - rsp = await client.post(url, json={"user_id": "some id"}) + rsp = await client.post(url) + data, error = await assert_status( + rsp, web.HTTPCreated if user_role == UserRole.GUEST else expected.created + ) + + if user_role != UserRole.ANONYMOUS: + assert not error, f"error received: {error}" + if data: + assert "pipeline_id" in data + assert ( + data["pipeline_id"] == f"{project_id}" + ), f"received pipeline id: {data['pipeline_id']}, expected {project_id}" + + +@pytest.mark.parametrize( + *standard_role_response(), +) +async def test_start_partial_pipeline( + client, + logged_user: Dict, + project_id: UUID, + user_role: UserRole, + expected: ExpectedResponse, +): + url = client.app.router["start_pipeline"].url_for(project_id=f"{project_id}") + rsp = await client.post( + url, json={"subgraph": ["node_id1", "node_id2", "node_id498"]} + ) data, error = await assert_status( rsp, web.HTTPCreated if user_role == UserRole.GUEST else expected.created ) @@ -88,7 +115,7 @@ async def test_stop_pipeline( expected: ExpectedResponse, ): url = client.app.router["stop_pipeline"].url_for(project_id=f"{project_id}") - rsp = await client.post(url, json={"user_id": "some id"}) + rsp = await client.post(url) await assert_status( rsp, web.HTTPNoContent if user_role == UserRole.GUEST else expected.no_content )