From 6960c22321b5f585c5cac756f6687e7c1ef07420 Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Thu, 12 Mar 2020 12:43:18 -0700 Subject: [PATCH 01/11] Adding a STOP_WORKERS exit code --- docs/source/merlin_variables.rst | 18 +++++++++++++----- merlin/celery.py | 1 + merlin/common/abstracts/enums/__init__.py | 2 ++ merlin/common/tasks.py | 18 ++++++++++++++++++ merlin/spec/expansion.py | 2 ++ 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/docs/source/merlin_variables.rst b/docs/source/merlin_variables.rst index 37593e5c9..a379deac4 100644 --- a/docs/source/merlin_variables.rst +++ b/docs/source/merlin_variables.rst @@ -15,7 +15,7 @@ The directory structure of merlin output looks like this: SPECROOT - + ... OUTPUT_PATH @@ -24,7 +24,7 @@ The directory structure of merlin output looks like this: .workspace WORKSPACE - + Reserved variables ------------------ @@ -54,7 +54,7 @@ Reserved variables * - ``$(MERLIN_INFO)`` - Directory within ``MERLIN_WORKSPACE`` that holds a provenance spec. Commonly used to hold ``samples.npy``. - - ``$(MERLIN_WORKSPACE)/merlin_info/`` + - ``$(MERLIN_WORKSPACE)/merlin_info/`` * - ``$(MERLIN_SAMPLE_ID)`` - Sample index in an ensemble - ``0`` ``1`` ``2`` ``3`` @@ -123,7 +123,7 @@ Step return variables * - ``$(MERLIN_RESTART)`` - Run this step's ``restart`` command, or re-run ``cmd`` if ``restart`` - is absent. + is absent. - :: @@ -148,7 +148,7 @@ Step return variables exit $(MERLIN_RETRY) max_retries: 23 - * - ``$(merlin_soft_fail)`` + * - ``$(MERLIN_SOFT_FAIL)`` - Mark this step as a failure, note in the warning log but keep going. Unknown return codes get translated to soft fails, so that they can be logged. @@ -167,4 +167,12 @@ Step return variables echo "Oh no, we've created skynet! Abort!" exit $(MERLIN_HARD_FAIL) + * - ``$(MERLIN_STOP_WORKERS)`` + - Launch a task to stop all active workers. To allow the current task to + finish and acknowledge the results to the server, will happen in 60 + seconds. + - + :: + # send a signal to all workers to stop + exit $(MERLIN_STOP_WORKERS) diff --git a/merlin/celery.py b/merlin/celery.py index 4ae6daf24..513657941 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -89,6 +89,7 @@ app.conf.update( task_acks_late=True, + task_reject_on_worker_lost=True, task_publish_retry_policy={ "interval_start": 10, "interval_step": 10, diff --git a/merlin/common/abstracts/enums/__init__.py b/merlin/common/abstracts/enums/__init__.py index 9e72b8dbc..472606e21 100644 --- a/merlin/common/abstracts/enums/__init__.py +++ b/merlin/common/abstracts/enums/__init__.py @@ -40,6 +40,7 @@ "HARD_FAIL_VALUE", "DRY_OK_VALUE", "RETRY_VALUE", + "STOP_WORKERS_VALUE", ) @@ -54,3 +55,4 @@ class ReturnCode(IntEnum): HARD_FAIL = 102 DRY_OK = 103 RETRY = 104 + STOP_WORKERS = 105 diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 12f2c3799..ecf651994 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -78,6 +78,7 @@ LOG = logging.getLogger(__name__) +STOP_COUNTDOWN=60 @shared_task(bind=True, autoretry_for=retry_exceptions, retry_backoff=True) def merlin_step(self, *args, **kwargs): @@ -145,6 +146,11 @@ def merlin_step(self, *args, **kwargs): stop_workers("celery", None, None) raise HardFailException + elif result == ReturnCode.STOP_WORKERS: + LOG.warning(f"*** Shutting down all workers in {STOP_COUNTDOWN} secs!") + shutdown = shutdown_workers.s() + shutdown.set(queue=step.get_task_queue()) + shutdown.apply_async(countdown=STOP_COUNTDOWN) else: LOG.warning( f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow." @@ -459,6 +465,18 @@ def expand_tasks_with_samples( LOG.debug(f"simple chain task queued") +@shared_task(bind=True, autoretry_for=retry_exceptions, retry_backoff=True, + acks_late=False, reject_on_worker_lost=False, name="merlin:shutdown_workers") +def shutdown_workers(*args, **kwargs): + """ + This task issues a call to shutdown workers. + + It wraps the stop_celery_workers call as a task. + It is acknolwedged right away, so that it will not be requeued when + executed by a worker. + """ + return stop_celery_workers('celery', None, None) + @shared_task( autoretry_for=retry_exceptions, retry_backoff=True, name="merlin:chordfinisher" ) diff --git a/merlin/spec/expansion.py b/merlin/spec/expansion.py index 9aae786ab..6f60d5096 100644 --- a/merlin/spec/expansion.py +++ b/merlin/spec/expansion.py @@ -59,6 +59,7 @@ "MERLIN_SOFT_FAIL", "MERLIN_HARD_FAIL", "MERLIN_RETRY", + "MERLIN_STOP_WORKERS", } MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED @@ -187,6 +188,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths): substitutions.append(("$(MERLIN_SOFT_FAIL)", str(int(ReturnCode.SOFT_FAIL)))) substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL)))) substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY)))) + substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS)))) return substitutions From ed45bf910a3e41ced912c278b75c3531552b4160 Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Thu, 12 Mar 2020 14:05:44 -0700 Subject: [PATCH 02/11] Remote worker shutdown exit code works! --- merlin/common/tasks.py | 6 ++++-- merlin/study/script_adapter.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index ecf651994..98a7253d7 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -143,7 +143,9 @@ def merlin_step(self, *args, **kwargs): # router.purge_tasks("celery", ?, force=True) # stop workers TODO make this more discriminatory, stopping only the relevant workers - stop_workers("celery", None, None) + shutdown = shutdown_workers.s() + shutdown.set(queue=step.get_task_queue()) + shutdown.apply_async(countdown=STOP_COUNTDOWN) raise HardFailException elif result == ReturnCode.STOP_WORKERS: @@ -475,7 +477,7 @@ def shutdown_workers(*args, **kwargs): It is acknolwedged right away, so that it will not be requeued when executed by a worker. """ - return stop_celery_workers('celery', None, None) + return stop_workers('celery', None, None, None) @shared_task( autoretry_for=retry_exceptions, retry_backoff=True, name="merlin:chordfinisher" diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py index 138ed510b..0fe0c5e55 100644 --- a/merlin/study/script_adapter.py +++ b/merlin/study/script_adapter.py @@ -404,6 +404,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): elif retcode == ReturnCode.RETRY: LOG.debug("Execution returned status RETRY.") step.restart = False + elif retcode == ReturnCode.STOP_WORKERS: + LOG.debug("Execution returned status STOP_WORKERS") else: LOG.warning( f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL" From 8f2bd5d6a95cf6871927ebd6d4fb09934fe222cb Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Thu, 12 Mar 2020 14:46:20 -0700 Subject: [PATCH 03/11] Run Black --- merlin/common/sample_index.py | 2 +- merlin/common/tasks.py | 23 +++++++++++++++++------ merlin/config/broker.py | 2 +- tests/integration/run_tests.py | 4 +++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/merlin/common/sample_index.py b/merlin/common/sample_index.py index c9a1c543e..1d5d724ce 100644 --- a/merlin/common/sample_index.py +++ b/merlin/common/sample_index.py @@ -54,7 +54,7 @@ def uniform_directories(num_samples=MAX_SAMPLE, bundle_size=1, level_max_dirs=10 while directory_sizes[0] < num_samples: directory_sizes.insert(0, directory_sizes[0] * level_max_dirs) # We've gone over the total number of samples, remove the first entry - del(directory_sizes[0]) + del directory_sizes[0] return directory_sizes diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 98a7253d7..799b9d91c 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -78,7 +78,8 @@ LOG = logging.getLogger(__name__) -STOP_COUNTDOWN=60 +STOP_COUNTDOWN = 60 + @shared_task(bind=True, autoretry_for=retry_exceptions, retry_backoff=True) def merlin_step(self, *args, **kwargs): @@ -424,8 +425,11 @@ def expand_tasks_with_samples( # Write a hierarchy to get the all paths string sample_index = create_hierarchy( - len(samples), bundle_size=1, directory_sizes=directory_sizes, root="", - n_digits=len(str(level_max_dirs)) + len(samples), + bundle_size=1, + directory_sizes=directory_sizes, + root="", + n_digits=len(str(level_max_dirs)), ) sample_paths = sample_index.make_directory_string() @@ -467,8 +471,14 @@ def expand_tasks_with_samples( LOG.debug(f"simple chain task queued") -@shared_task(bind=True, autoretry_for=retry_exceptions, retry_backoff=True, - acks_late=False, reject_on_worker_lost=False, name="merlin:shutdown_workers") +@shared_task( + bind=True, + autoretry_for=retry_exceptions, + retry_backoff=True, + acks_late=False, + reject_on_worker_lost=False, + name="merlin:shutdown_workers", +) def shutdown_workers(*args, **kwargs): """ This task issues a call to shutdown workers. @@ -477,7 +487,8 @@ def shutdown_workers(*args, **kwargs): It is acknolwedged right away, so that it will not be requeued when executed by a worker. """ - return stop_workers('celery', None, None, None) + return stop_workers("celery", None, None, None) + @shared_task( autoretry_for=retry_exceptions, retry_backoff=True, name="merlin:chordfinisher" diff --git a/merlin/config/broker.py b/merlin/config/broker.py index 898d2f2f0..74f0efabe 100644 --- a/merlin/config/broker.py +++ b/merlin/config/broker.py @@ -166,7 +166,7 @@ def get_redis_connection(config_path, include_password, ssl=False): username = CONFIG.broker.username except (AttributeError, KeyError): username = "" - + try: password_filepath = CONFIG.broker.password try: diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py index be296a853..32cd019cd 100644 --- a/tests/integration/run_tests.py +++ b/tests/integration/run_tests.py @@ -438,7 +438,9 @@ def define_tests(): "dry feature_demo": ( f"{run} {demo} --local --dry --vars OUTPUT_PATH=./{OUTPUT_DIR}", [ - StepFileExistsCond("verify", "*/verify_*.sh", "feature_demo", OUTPUT_DIR), + StepFileExistsCond( + "verify", "*/verify_*.sh", "feature_demo", OUTPUT_DIR + ), ReturnCodeCond(), ], "local", From f3b8b6329b8362c46847782e5fe60699fb263ca7 Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Thu, 12 Mar 2020 14:55:21 -0700 Subject: [PATCH 04/11] Add shutdown to feature_demo.yaml; this should fix dsitributed integration test --- merlin/examples/workflows/feature_demo/feature_demo.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/merlin/examples/workflows/feature_demo/feature_demo.yaml b/merlin/examples/workflows/feature_demo/feature_demo.yaml index f3884d7c0..a555055fc 100644 --- a/merlin/examples/workflows/feature_demo/feature_demo.yaml +++ b/merlin/examples/workflows/feature_demo/feature_demo.yaml @@ -109,6 +109,15 @@ study: shell: /usr/bin/env python2 task_queue: pyth2_hello + - name: shutdown all workers + description: | + shutdown all workers + run: + cmd: | + exit $(MERLIN_STOP_WORKERS) + depends: [verify, python3_hello, python2_hello] + task_queue: $(VERIFY_QUEUE) + global.parameters: X2: values : [0.5] From 7b2246b0886eef94142033e8ef30a0f47f93392f Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 08:53:44 -0700 Subject: [PATCH 05/11] MerlinScriptAdapter uses its own submission function --- CHANGELOG.md | 2 + merlin/common/abstracts/enums/__init__.py | 2 + merlin/study/script_adapter.py | 56 +++++++++++++++++++++-- 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a2be7359..d6c66daff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,9 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bug that prevented an empty username for results backend and broker when using redis. - Bug that prevented `OUTPUT_PATH` from being an integer. - Bug that always had sample directory tree start with "0" +- "Error" message whenever a non-zero return code is given ### Changed - Updated docs from `pip3 install merlinwf` to `pip3 install merlin`. +- Scipt launching uses Merlin submission instead of subclassing maestro submit ## [1.4.1] [2020-03-06] diff --git a/merlin/common/abstracts/enums/__init__.py b/merlin/common/abstracts/enums/__init__.py index 472606e21..78347e77d 100644 --- a/merlin/common/abstracts/enums/__init__.py +++ b/merlin/common/abstracts/enums/__init__.py @@ -35,6 +35,7 @@ __all__ = ( "ReturnCode", "OK_VALUE", + "ERROR_VALUE", "RESTART_VALUE", "SOFT_FAIL_VALUE", "HARD_FAIL_VALUE", @@ -50,6 +51,7 @@ class ReturnCode(IntEnum): """ OK = 0 + ERROR = 1 RESTART = 100 SOFT_FAIL = 101 HARD_FAIL = 102 diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py index 0fe0c5e55..8300461db 100644 --- a/merlin/study/script_adapter.py +++ b/merlin/study/script_adapter.py @@ -33,9 +33,12 @@ """ import logging +import os +from maestrowf.interfaces.script import SubmissionRecord from maestrowf.interfaces.script.localscriptadapter import LocalScriptAdapter from maestrowf.interfaces.script.slurmscriptadapter import SlurmScriptAdapter +from maestrowf.utils import start_process from merlin.common.abstracts.enums import ReturnCode @@ -388,9 +391,7 @@ def submit(self, step, path, cwd, job_map=None, env=None): LOG.debug("cwd = %s", cwd) LOG.debug("Script to execute: %s", path) LOG.debug("starting process %s in cwd %s" % (path, cwd)) - submission_record = super(MerlinScriptAdapter, self).submit( - step, path, cwd, job_map, env - ) + submission_record = self._execute_subprocess(step.name, path, cwd, env, False) retcode = submission_record.return_code if retcode == ReturnCode.OK: LOG.debug("Execution returned status OK.") @@ -420,6 +421,55 @@ def submit(self, step, path, cwd, job_map=None, env=None): return submission_record + def _execute_subprocess( + self, output_name, script_path, cwd, env=None, join_output=False + ): + """ + Execute the subprocess script locally. + If cwd is specified, the submit method will operate outside of the path + specified by the 'cwd' parameter. + If env is specified, the submit method will set the environment + variables for submission to the specified values. The 'env' parameter + should be a dictionary of environment variables. + + :param output_name: Output name for stdout and stderr (output_name.out). If None, don't write. + :param script_path: Path to the script to be executed. + :param cwd: Path to the current working directory. + :param env: A dict containing a modified environment for execution. + :param join_output: If True, append stderr to stdout + :returns: The return code of the submission command and job identifier (SubmissionRecord). + """ + p = start_process(script_path, shell=False, cwd=cwd, env=env) + pid = p.pid + output, err = p.communicate() + retcode = p.wait() + + # This allows us to save on iNodes by not writing the output, + # or by appending error to output + if output_name is not None: + o_path = os.path.join(cwd, "{}.out".format(output_name)) + if join_output: + e_path = o_path + else: + e_path = os.path.join(cwd, "{}.err".format(output_name)) + + with open(o_path, "w") as out: + out.write(output) + + if join_output: + out.write("\n####### stderr follows #######\n") + + with open(e_path, "w") as out: + out.write(err) + + if retcode == 0: + LOG.info("Execution returned status OK.") + return SubmissionRecord(ReturnCode.OK, retcode, pid) + else: + _record = SubmissionRecord(ReturnCode.ERROR, retcode, pid) + _record.add_info("stderr", str(err)) + return _record + class MerlinScriptAdapterFactory(object): factories = { From 7449a94667249835c047e47d800b40cd30c06dba Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 11:20:48 -0700 Subject: [PATCH 06/11] For scalability, only open the stderr file if you actually want it. Fewer file system calls. --- merlin/study/script_adapter.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py index 8300461db..70bb73cae 100644 --- a/merlin/study/script_adapter.py +++ b/merlin/study/script_adapter.py @@ -448,19 +448,18 @@ def _execute_subprocess( # or by appending error to output if output_name is not None: o_path = os.path.join(cwd, "{}.out".format(output_name)) - if join_output: - e_path = o_path - else: - e_path = os.path.join(cwd, "{}.err".format(output_name)) with open(o_path, "w") as out: out.write(output) if join_output: out.write("\n####### stderr follows #######\n") + out.write(err) - with open(e_path, "w") as out: - out.write(err) + if not join_output: + e_path = os.path.join(cwd, "{}.err".format(output_name)) + with open(e_path, "w") as out: + out.write(err) if retcode == 0: LOG.info("Execution returned status OK.") From 28d6af70e2a294423b389e141c7cb3b616ee58ed Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 12:35:03 -0700 Subject: [PATCH 07/11] Modify HARD_FAIL to shutdown only the workers connected to that step --- docs/source/faq.rst | 21 ++++++++++++++++++++- docs/source/merlin_variables.rst | 10 +++++++++- merlin/common/tasks.py | 22 ++++++++++++++-------- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/docs/source/faq.rst b/docs/source/faq.rst index a496a4d9a..9d01bb8dc 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -146,11 +146,15 @@ How do I mark a step failure? Each step is ultimately designated as: * a success ``$(MERLIN_SUCCESS)`` -- writes a ``MERLIN_FINISHED`` file to the step's workspace directory * a soft failure ``$(MERLIN_SOFT_FAIL)`` -- allows the workflow to continue -* a hard failure ``$(MERLIN_HARD_FAIL)`` -- stops the whole workflow +* a hard failure ``$(MERLIN_HARD_FAIL)`` -- stops the whole workflow by shutting down all workers on that step Normally this happens behinds the scenes, so you don't need to worry about it. To hard-code this into your step logic, use a shell command such as ``exit $(MERLIN_HARD_FAIL)``. +.. note:: HARD_FAIL + The ``$(MERLIN_HARD_FAIL)`` exit code will shutdown all workers connected to the queue associated with the failed step. + To shutdown *all* workers use the ``$(MERLIN_STOP_WORKERS)`` exit code + To rerun all failed steps in a workflow, see :ref:`restart`. If you really want a previously successful step to be re-run, you can first manually remove the ``MERLIN_FINISHED`` file. @@ -218,10 +222,25 @@ How do I see what workers are connected? How do I stop workers? ~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Interactively outside of a workflow (e.g. at the command line), you can do this with + .. code:: bash $ merlin stop-workers +This give you fine control over which kinds of workers to stop, for instance via +a regex on their name, or the queue names you'd like to stop. + +From within a step, you can exit with the ``$(MERLIN_STOP_WORKERS)`` code, which will +issue a time-delayed call to stop all of the workers, or with the ``$(MERLIN_HARD_FAIL)`` +directive, which will stop all workers connected to the current step. This helps prevent +the *suicide race condition* where a worker could kill itself before removing the step +from the workflow, causing the command to be left there for the next worker and creating +a really bad loop. + +You can of course call ``merlin stop-workers`` from within a step, but be careful to make +sure the worker executing it won't be stopped too. + For more tricks, see :ref:`stop-workers`. .. _restart: diff --git a/docs/source/merlin_variables.rst b/docs/source/merlin_variables.rst index a379deac4..050846b21 100644 --- a/docs/source/merlin_variables.rst +++ b/docs/source/merlin_variables.rst @@ -160,7 +160,15 @@ Step return variables * - ``$(MERLIN_HARD_FAIL)`` - Something went terribly wrong and I need to stop the whole workflow. - Raises a ``HardFailException``. + Raises a ``HardFailException`` and stops all workers connected to that + step. Workers will stop after a 60 second delay to allow the step to + be acknowledged by the server. + + .. note:: + Workers in isolated parts of the + workflow not consuming from the bad step will continue. You can stop + all workers with ``$(MERLIN_STOP_WORKERS)``. + - :: diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 799b9d91c..460fdcd40 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -137,15 +137,19 @@ def merlin_step(self, *args, **kwargs): f"*** Step '{step_name}' in '{step_dir}' soft failed. Continuing with workflow." ) elif result == ReturnCode.HARD_FAIL: - LOG.error( - f"*** Step '{step_name}' in '{step_dir}' hard failed. Quitting workflow." - ) # TODO purge queues? function requires maestro_spec # router.purge_tasks("celery", ?, force=True) - # stop workers TODO make this more discriminatory, stopping only the relevant workers - shutdown = shutdown_workers.s() - shutdown.set(queue=step.get_task_queue()) + # stop all workers attached to this queue + step_queue = step.get_task_queue() + LOG.error( + f"*** Step '{step_name}' in '{step_dir}' hard failed. Quitting workflow." + ) + LOG.error( + f"*** Shutting down all workers connected to this queue ({step_queue}) in {STOP_COUNTDOWN} secs!" + ) + shutdown = shutdown_workers.s(queues=[step_queue]) + shutdown.set(queue=step_queue) shutdown.apply_async(countdown=STOP_COUNTDOWN) raise HardFailException @@ -479,15 +483,17 @@ def expand_tasks_with_samples( reject_on_worker_lost=False, name="merlin:shutdown_workers", ) -def shutdown_workers(*args, **kwargs): +def shutdown_workers(queues=None): """ This task issues a call to shutdown workers. It wraps the stop_celery_workers call as a task. It is acknolwedged right away, so that it will not be requeued when executed by a worker. + + :param: queues: The specific queues to shutdown """ - return stop_workers("celery", None, None, None) + return stop_workers("celery", None, queues, None) @shared_task( From f1130daf05238a168e15ba9b8ed1ad7756bb42ab Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 12:38:31 -0700 Subject: [PATCH 08/11] Update changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6c66daff..29968bf20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - A url keyword in the app.yaml file to override the entire broker or results backend configuration. - The `all` option to `batch.nodes`. - Auto zero-padding of sample directories, e.g. 00/00, 00/01 .. 10/10 +- `$(MERLIN_STOP_WORKERS)` exit code that shuts down all workers ### Fixed - Bug that prevented an empty username for results backend and broker when using redis. @@ -20,7 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Updated docs from `pip3 install merlinwf` to `pip3 install merlin`. -- Scipt launching uses Merlin submission instead of subclassing maestro submit +- Script launching uses Merlin submission instead of subclassing maestro submit +- `$(MERLIN_HARD_FAIL)` now shuts down only workers connected to the bad step's queue ## [1.4.1] [2020-03-06] From 0e70b5896c26b51cff100fbfd19d94319245c719 Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 12:41:13 -0700 Subject: [PATCH 09/11] Doc tweaks --- docs/source/faq.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 9d01bb8dc..9a400b652 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -151,9 +151,9 @@ Each step is ultimately designated as: Normally this happens behinds the scenes, so you don't need to worry about it. To hard-code this into your step logic, use a shell command such as ``exit $(MERLIN_HARD_FAIL)``. -.. note:: HARD_FAIL - The ``$(MERLIN_HARD_FAIL)`` exit code will shutdown all workers connected to the queue associated with the failed step. - To shutdown *all* workers use the ``$(MERLIN_STOP_WORKERS)`` exit code +.. note:: ``$(MERLIN_HARD_FAIL)`` + The ``$(MERLIN_HARD_FAIL)`` exit code will shutdown all workers connected to the queue associated + with the failed step. To shutdown *all* workers use the ``$(MERLIN_STOP_WORKERS)`` exit code To rerun all failed steps in a workflow, see :ref:`restart`. If you really want a previously successful step to be re-run, you can first manually remove the ``MERLIN_FINISHED`` file. From de1c894f0a1830e7b1e9728f52a03cd6993f960f Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 14:14:34 -0700 Subject: [PATCH 10/11] Fix MERLIN_HARD_FAIL. Change feature demo to use this instead of shutting down everything --- docs/source/faq.rst | 2 +- merlin/common/tasks.py | 14 +++++++++----- .../workflows/feature_demo/feature_demo.yaml | 7 ++++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 9a400b652..60d76eba9 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -228,7 +228,7 @@ Interactively outside of a workflow (e.g. at the command line), you can do this $ merlin stop-workers -This give you fine control over which kinds of workers to stop, for instance via +This gives you fine control over which kinds of workers to stop, for instance via a regex on their name, or the queue names you'd like to stop. From within a step, you can exit with the ``$(MERLIN_STOP_WORKERS)`` code, which will diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 460fdcd40..96bf6025c 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -148,14 +148,14 @@ def merlin_step(self, *args, **kwargs): LOG.error( f"*** Shutting down all workers connected to this queue ({step_queue}) in {STOP_COUNTDOWN} secs!" ) - shutdown = shutdown_workers.s(queues=[step_queue]) + shutdown = shutdown_workers.s([step_queue]) shutdown.set(queue=step_queue) shutdown.apply_async(countdown=STOP_COUNTDOWN) raise HardFailException elif result == ReturnCode.STOP_WORKERS: LOG.warning(f"*** Shutting down all workers in {STOP_COUNTDOWN} secs!") - shutdown = shutdown_workers.s() + shutdown = shutdown_workers.s(None) shutdown.set(queue=step.get_task_queue()) shutdown.apply_async(countdown=STOP_COUNTDOWN) else: @@ -483,7 +483,7 @@ def expand_tasks_with_samples( reject_on_worker_lost=False, name="merlin:shutdown_workers", ) -def shutdown_workers(queues=None): +def shutdown_workers(self, shutdown_queues): """ This task issues a call to shutdown workers. @@ -491,9 +491,13 @@ def shutdown_workers(queues=None): It is acknolwedged right away, so that it will not be requeued when executed by a worker. - :param: queues: The specific queues to shutdown + :param: shutdown_queues: The specific queues to shutdown (list) """ - return stop_workers("celery", None, queues, None) + if shutdown_queues is not None: + LOG.warning(f"Shutting down workers in queues {shutdown_queues}!") + else: + LOG.warning(f"Shutting down workers in all queues!") + return stop_workers("celery", None, shutdown_queues, None) @shared_task( diff --git a/merlin/examples/workflows/feature_demo/feature_demo.yaml b/merlin/examples/workflows/feature_demo/feature_demo.yaml index a555055fc..a548619dc 100644 --- a/merlin/examples/workflows/feature_demo/feature_demo.yaml +++ b/merlin/examples/workflows/feature_demo/feature_demo.yaml @@ -109,12 +109,13 @@ study: shell: /usr/bin/env python2 task_queue: pyth2_hello - - name: shutdown all workers + - name: shutdown description: | - shutdown all workers + shutdown all workers attached to this queue run: cmd: | - exit $(MERLIN_STOP_WORKERS) + exit $(MERLIN_HARD_FAIL) + #exit $(MERLIN_STOP_WORKERS) depends: [verify, python3_hello, python2_hello] task_queue: $(VERIFY_QUEUE) From a138d3f0395e870d71df56c4ccbe0f6248012d4d Mon Sep 17 00:00:00 2001 From: Luc Peterson Date: Fri, 13 Mar 2020 15:02:04 -0700 Subject: [PATCH 11/11] Removed purge comment todo. Add comment in feature demo about HARD_FAIL vs STOP_WORKERS --- merlin/common/tasks.py | 2 -- merlin/examples/workflows/feature_demo/feature_demo.yaml | 5 +++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 96bf6025c..29e987bff 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -137,8 +137,6 @@ def merlin_step(self, *args, **kwargs): f"*** Step '{step_name}' in '{step_dir}' soft failed. Continuing with workflow." ) elif result == ReturnCode.HARD_FAIL: - # TODO purge queues? function requires maestro_spec - # router.purge_tasks("celery", ?, force=True) # stop all workers attached to this queue step_queue = step.get_task_queue() diff --git a/merlin/examples/workflows/feature_demo/feature_demo.yaml b/merlin/examples/workflows/feature_demo/feature_demo.yaml index a548619dc..2a623e4f4 100644 --- a/merlin/examples/workflows/feature_demo/feature_demo.yaml +++ b/merlin/examples/workflows/feature_demo/feature_demo.yaml @@ -112,9 +112,14 @@ study: - name: shutdown description: | shutdown all workers attached to this queue + this is actually -all- workers on this workflow, since there's only one + defined worker and it is attached to this queue run: cmd: | + # Stop only workers attached to $(VERIFY_QUEUE) exit $(MERLIN_HARD_FAIL) + + # To stop all workers -everywhere- #exit $(MERLIN_STOP_WORKERS) depends: [verify, python3_hello, python2_hello] task_queue: $(VERIFY_QUEUE)