Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/luc/shutdown task acknowledgement #182

Merged
18 changes: 13 additions & 5 deletions docs/source/merlin_variables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The directory structure of merlin output looks like this:

SPECROOT
<spec.yaml>

...

OUTPUT_PATH
Expand All @@ -24,7 +24,7 @@ The directory structure of merlin output looks like this:
<provenance_spec.yaml>
<other_step_name>.workspace
WORKSPACE


Reserved variables
------------------
Expand Down Expand Up @@ -54,7 +54,7 @@ Reserved variables
* - ``$(MERLIN_INFO)``
- Directory within ``MERLIN_WORKSPACE`` that holds a provenance spec.
Commonly used to hold ``samples.npy``.
- ``$(MERLIN_WORKSPACE)/merlin_info/``
- ``$(MERLIN_WORKSPACE)/merlin_info/``
* - ``$(MERLIN_SAMPLE_ID)``
- Sample index in an ensemble
- ``0`` ``1`` ``2`` ``3``
Expand Down Expand Up @@ -123,7 +123,7 @@ Step return variables

* - ``$(MERLIN_RESTART)``
- Run this step's ``restart`` command, or re-run ``cmd`` if ``restart``
is absent.
is absent.
-
::

Expand All @@ -148,7 +148,7 @@ Step return variables
exit $(MERLIN_RETRY)
max_retries: 23

* - ``$(merlin_soft_fail)``
* - ``$(MERLIN_SOFT_FAIL)``
- Mark this step as a failure, note in the warning log but keep going.
Unknown return codes get translated to soft fails, so that they can
be logged.
Expand All @@ -167,4 +167,12 @@ Step return variables
echo "Oh no, we've created skynet! Abort!"
exit $(MERLIN_HARD_FAIL)

* - ``$(MERLIN_STOP_WORKERS)``
- Launch a task to stop all active workers. To allow the current task to
finish and acknowledge the results to the server, will happen in 60
seconds.
-
::

# send a signal to all workers to stop
exit $(MERLIN_STOP_WORKERS)
1 change: 1 addition & 0 deletions merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@

app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
task_publish_retry_policy={
"interval_start": 10,
"interval_step": 10,
Expand Down
2 changes: 2 additions & 0 deletions merlin/common/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"HARD_FAIL_VALUE",
"DRY_OK_VALUE",
"RETRY_VALUE",
"STOP_WORKERS_VALUE",
)


Expand All @@ -54,3 +55,4 @@ class ReturnCode(IntEnum):
HARD_FAIL = 102
DRY_OK = 103
RETRY = 104
STOP_WORKERS = 105
2 changes: 1 addition & 1 deletion merlin/common/sample_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def uniform_directories(num_samples=MAX_SAMPLE, bundle_size=1, level_max_dirs=10
while directory_sizes[0] < num_samples:
directory_sizes.insert(0, directory_sizes[0] * level_max_dirs)
# We've gone over the total number of samples, remove the first entry
del(directory_sizes[0])
del directory_sizes[0]
return directory_sizes


Expand Down
37 changes: 34 additions & 3 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@

LOG = logging.getLogger(__name__)

STOP_COUNTDOWN = 60


@shared_task(bind=True, autoretry_for=retry_exceptions, retry_backoff=True)
def merlin_step(self, *args, **kwargs):
Expand Down Expand Up @@ -142,9 +144,16 @@ def merlin_step(self, *args, **kwargs):
# router.purge_tasks("celery", ?, force=True)

# stop workers TODO make this more discriminatory, stopping only the relevant workers
stop_workers("celery", None, None)
shutdown = shutdown_workers.s()
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)

raise HardFailException
elif result == ReturnCode.STOP_WORKERS:
LOG.warning(f"*** Shutting down all workers in {STOP_COUNTDOWN} secs!")
shutdown = shutdown_workers.s()
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)
else:
LOG.warning(
f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow."
Expand Down Expand Up @@ -416,8 +425,11 @@ def expand_tasks_with_samples(

# Write a hierarchy to get the all paths string
sample_index = create_hierarchy(
len(samples), bundle_size=1, directory_sizes=directory_sizes, root="",
n_digits=len(str(level_max_dirs))
len(samples),
bundle_size=1,
directory_sizes=directory_sizes,
root="",
n_digits=len(str(level_max_dirs)),
)
sample_paths = sample_index.make_directory_string()

Expand Down Expand Up @@ -459,6 +471,25 @@ def expand_tasks_with_samples(
LOG.debug(f"simple chain task queued")


@shared_task(
bind=True,
autoretry_for=retry_exceptions,
retry_backoff=True,
acks_late=False,
reject_on_worker_lost=False,
name="merlin:shutdown_workers",
)
def shutdown_workers(*args, **kwargs):
"""
This task issues a call to shutdown workers.

It wraps the stop_celery_workers call as a task.
It is acknolwedged right away, so that it will not be requeued when
executed by a worker.
"""
return stop_workers("celery", None, None, None)


@shared_task(
autoretry_for=retry_exceptions, retry_backoff=True, name="merlin:chordfinisher"
)
Expand Down
2 changes: 1 addition & 1 deletion merlin/config/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def get_redis_connection(config_path, include_password, ssl=False):
username = CONFIG.broker.username
except (AttributeError, KeyError):
username = ""

try:
password_filepath = CONFIG.broker.password
try:
Expand Down
9 changes: 9 additions & 0 deletions merlin/examples/workflows/feature_demo/feature_demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ study:
shell: /usr/bin/env python2
task_queue: pyth2_hello

- name: shutdown all workers
description: |
shutdown all workers
run:
cmd: |
exit $(MERLIN_STOP_WORKERS)
depends: [verify, python3_hello, python2_hello]
task_queue: $(VERIFY_QUEUE)

global.parameters:
X2:
values : [0.5]
Expand Down
2 changes: 2 additions & 0 deletions merlin/spec/expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"MERLIN_SOFT_FAIL",
"MERLIN_HARD_FAIL",
"MERLIN_RETRY",
"MERLIN_STOP_WORKERS",
}
MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE
RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED
Expand Down Expand Up @@ -187,6 +188,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths):
substitutions.append(("$(MERLIN_SOFT_FAIL)", str(int(ReturnCode.SOFT_FAIL))))
substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL))))
substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY))))
substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS))))
return substitutions


Expand Down
2 changes: 2 additions & 0 deletions merlin/study/script_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ def submit(self, step, path, cwd, job_map=None, env=None):
elif retcode == ReturnCode.RETRY:
LOG.debug("Execution returned status RETRY.")
step.restart = False
elif retcode == ReturnCode.STOP_WORKERS:
LOG.debug("Execution returned status STOP_WORKERS")
else:
LOG.warning(
f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL"
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ def define_tests():
"dry feature_demo": (
f"{run} {demo} --local --dry --vars OUTPUT_PATH=./{OUTPUT_DIR}",
[
StepFileExistsCond("verify", "*/verify_*.sh", "feature_demo", OUTPUT_DIR),
StepFileExistsCond(
"verify", "*/verify_*.sh", "feature_demo", OUTPUT_DIR
),
ReturnCodeCond(),
],
"local",
Expand Down