Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
78fe7f9
create subgraphs from pipelines
sanderegg Dec 9, 2020
3933945
refactor
sanderegg Dec 9, 2020
54c3251
added integration test to run partial pipelines
sanderegg Dec 9, 2020
f03b5af
add a test with entrypoint not being the first node
sanderegg Dec 11, 2020
28117ee
find minimal viable graph
sanderegg Dec 11, 2020
cb2eb70
remove invalid stack entries
sanderegg Dec 11, 2020
8e62d95
fix call
sanderegg Dec 11, 2020
d6c6ccf
get nodes that are dirty
sanderegg Dec 11, 2020
2a3471a
return the whole pipeline now
sanderegg Dec 13, 2020
2820eaf
do not publish when not to be run
sanderegg Dec 13, 2020
52186d0
fixed getting state of partial pipelines
sanderegg Dec 13, 2020
2bf75ed
passing test
sanderegg Dec 14, 2020
fca1e67
fixed testing dags
sanderegg Dec 14, 2020
cf0b6f6
linter
sanderegg Dec 14, 2020
714d0df
return 422 for a pipeline without computational services
sanderegg Dec 14, 2020
9c2b2f2
update API
sanderegg Dec 14, 2020
7b04943
added new API to webserver for running partial pipelines
sanderegg Dec 14, 2020
da2c3b3
adding test for partial pipelines
sanderegg Dec 14, 2020
64d52d6
return the pipeline now
sanderegg Dec 14, 2020
cfc1a99
update openapi
sanderegg Dec 14, 2020
3a234ff
transfer partial pipeline call to director v2
sanderegg Dec 14, 2020
a60b108
ensure conversion from uuid to str
sanderegg Dec 14, 2020
99cade5
set is not json serializable
sanderegg Dec 14, 2020
c43ed4a
small fixes in sidecar/logs
sanderegg Dec 14, 2020
b460fb8
fix unit tests
sanderegg Dec 15, 2020
b8f5d93
fix webserver test
sanderegg Dec 15, 2020
4e93ff2
@pcrespov review: flush
sanderegg Dec 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/specs/webserver/openapi-services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,24 @@ paths:
operationId: start_pipeline
parameters:
- $ref: "#/components/parameters/ProjectId"
requestBody:
required: false
content:
application/json:
schema:
type: object
properties:
subgraph:
description: The node uuids selected for running a partial pipeline
type: array
uniqueItems: true
items:
type: string
format: uuid

responses:
"201":
description: Succesffully started the pipeline
description: Successfully started the pipeline
content:
application/json:
schema:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import re
from typing import Dict, List

import pytest
from aioresponses import aioresponses
from aioresponses.core import CallbackResult
from models_library.projects_state import RunningState
from yarl import URL

# The adjacency list is defined as a dictionary with the key to the node and its list of successors
FULL_PROJECT_PIPELINE_ADJACENCY: Dict[str, List[str]] = {
"node id 1": ["node id 2", "node id 3", "node id 4"],
"node id 2": ["node_id 5"],
"node id 3": ["node_id 5"],
"node id 4": ["node_id 5"],
"node id 5": [],
}


def creation_cb(url, **kwargs) -> CallbackResult:

Expand All @@ -18,9 +28,19 @@ def creation_cb(url, **kwargs) -> CallbackResult:
if "start_pipeline" in body and body["start_pipeline"]
else RunningState.NOT_STARTED
)
pipeline: Dict[str, List[str]] = FULL_PROJECT_PIPELINE_ADJACENCY
if body.get("subgraph"):
# create some fake adjacency list
for node_id in body.get("subgraph"):
pipeline[node_id] = ["some node 5334", "some node 2324"]

return CallbackResult(
status=201, payload={"id": kwargs["json"]["project_id"], "state": state}
status=201,
payload={
"id": kwargs["json"]["project_id"],
"state": state,
"pipeline": pipeline,
},
)


Expand Down
17 changes: 9 additions & 8 deletions packages/service-library/src/servicelib/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def set_logging_handler(


def _log_arguments(
logger_obj: logging.Logger, func: Callable, *args, **kwargs
logger_obj: logging.Logger, level: int, func: Callable, *args, **kwargs
) -> Dict[str, str]:
args_passed_in_function = [repr(a) for a in args]
kwargs_passed_in_function = [f"{k}={v!r}" for k, v in kwargs.items()]
Expand All @@ -112,7 +112,8 @@ def _log_arguments(
}

# Before to the function execution, log function details.
logger_obj.info(
logger_obj.log(
level,
"Arguments: %s - Begin function",
formatted_arguments,
extra=extra_args,
Expand All @@ -121,7 +122,7 @@ def _log_arguments(
return extra_args


def log_decorator(logger=None):
def log_decorator(logger=None, level: int = logging.DEBUG):
# Build logger object
logger_obj = logger or log

Expand All @@ -130,12 +131,12 @@ def log_decorator_info(func):

@functools.wraps(func)
async def log_decorator_wrapper(*args, **kwargs):
extra_args = _log_arguments(logger_obj, func, *args, **kwargs)
extra_args = _log_arguments(logger_obj, level, func, *args, **kwargs)
try:
# log return value from the function
value = await func(*args, **kwargs)
logger_obj.debug(
"Returned: - End function %r", value, extra=extra_args
logger_obj.log(
level, "Returned: - End function %r", value, extra=extra_args
)
except:
# log exception if occurs in function
Expand All @@ -154,8 +155,8 @@ def log_decorator_wrapper(*args, **kwargs):
try:
# log return value from the function
value = func(*args, **kwargs)
logger_obj.debug(
"Returned: - End function %r", value, extra=extra_args
logger_obj.log(
level, "Returned: - End function %r", value, extra=extra_args
)
except:
# log exception if occurs in function
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

# pylint: disable=too-many-arguments
import logging
import warnings
Expand All @@ -7,12 +9,11 @@

import aiofiles
from aiohttp import ClientPayloadError, ClientSession, ClientTimeout
from yarl import URL

from models_library.settings.services_common import ServicesCommonSettings
from simcore_service_storage_sdk import ApiClient, Configuration, UsersApi
from simcore_service_storage_sdk.rest import ApiException
from tqdm import tqdm
from yarl import URL

from ..config.http_clients import client_request_settings
from . import config, exceptions
Expand Down Expand Up @@ -177,7 +178,7 @@ async def _upload_file_to_link(
)
pbar.update(file_size)
# get the S3 etag from the headers
e_tag = resp.headers.get("Etag", None)
e_tag = json.loads(resp.headers.get("Etag", None))
log.debug("Uploaded %s to %s, received Etag %s", file_path, url, e_tag)
return e_tag

Expand Down
Loading