Skip to content
This repository has been archived by the owner on Jul 17, 2018. It is now read-only.

Commit

Permalink
ARIA-237 Support for resuming failed workflow executions
Browse files Browse the repository at this point in the history
Support for resuming failed workflow. It is now possible to rerun
failed tasks.

Additional changes:
* When a task succeeds, the attempt_counter is moved forward.
* Fixed an issue with the cli usage of resumable workflows.
  • Loading branch information
mxmrlv committed Jul 11, 2017
1 parent 4c789f9 commit 1b1e98d
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 53 deletions.
10 changes: 3 additions & 7 deletions aria/cli/commands/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,17 @@ def start(workflow_name,
@executions.command(name='resume',
short_help='Resume a stopped execution')
@aria.argument('execution-id')
@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
@aria.options.dry_execution
@aria.options.task_max_attempts()
@aria.options.task_retry_interval()
@aria.options.retry_failed_tasks
@aria.options.mark_pattern()
@aria.options.verbose()
@aria.pass_model_storage
@aria.pass_resource_storage
@aria.pass_plugin_manager
@aria.pass_logger
def resume(execution_id,
retry_failed_tasks,
dry,
task_max_attempts,
task_retry_interval,
mark_pattern,
model_storage,
resource_storage,
Expand All @@ -194,8 +191,7 @@ def resume(execution_id,
workflow_runner = \
WorkflowRunner(
model_storage, resource_storage, plugin_manager,
execution_id=execution_id, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
)

logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
Expand Down
6 changes: 6 additions & 0 deletions aria/cli/core/aria.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ def __init__(self):
is_flag=True,
help=helptexts.DRY_EXECUTION)

self.retry_failed_tasks = click.option(
'--retry-failed-tasks',
is_flag=True,
help=helptexts.RETRY_FAILED_TASK
)

self.reset_config = click.option(
'--reset-config',
is_flag=True,
Expand Down
1 change: 1 addition & 0 deletions aria/cli/helptexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"How many times should a task be attempted in case of failures [default: {0}]"
DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \
"effects)"
RETRY_FAILED_TASK = "Retry tasks that failed in the previous execution attempt"
IGNORE_AVAILABLE_NODES = "Delete the service even if it has available nodes"
SORT_BY = "Key for sorting the list"
DESCENDING = "Sort list in descending order [default: False]"
Expand Down
4 changes: 3 additions & 1 deletion aria/modeling/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES,
CANCELLED: PENDING
# Retrying
CANCELLED: PENDING,
FAILED: PENDING
}

# region one_to_many relationships
Expand Down
8 changes: 6 additions & 2 deletions aria/orchestrator/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
class WorkflowRunner(object):

def __init__(self, model_storage, resource_storage, plugin_manager,
execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
execution_id=None, retry_failed_tasks=False,
service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Expand All @@ -62,6 +63,7 @@ def __init__(self, model_storage, resource_storage, plugin_manager,
"and service id with inputs")

self._is_resume = execution_id is not None
self._retry_failed_tasks = retry_failed_tasks

self._model_storage = model_storage
self._resource_storage = resource_storage
Expand Down Expand Up @@ -116,7 +118,9 @@ def service(self):
return self._model_storage.service.get(self._service_id)

def execute(self):
self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
self._engine.execute(ctx=self._workflow_context,
resuming=self._is_resume,
retry_failed=self._retry_failed_tasks)

def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
Expand Down
9 changes: 6 additions & 3 deletions aria/orchestrator/workflows/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def __init__(self, executors, **kwargs):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())

def execute(self, ctx, resuming=False):
def execute(self, ctx, resuming=False, retry_failed=False):
"""
Executes the workflow.
"""
if resuming:
events.on_resume_workflow_signal.send(ctx)
events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)

tasks_tracker = _TasksTracker(ctx)

try:
events.start_workflow_signal.send(ctx)
while True:
Expand Down Expand Up @@ -124,8 +125,10 @@ def _handle_ended_tasks(task):


class _TasksTracker(object):

def __init__(self, ctx):
self._ctx = ctx

self._tasks = ctx.execution.tasks
self._executed_tasks = [task for task in self._tasks if task.has_ended()]
self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
Expand Down Expand Up @@ -155,7 +158,7 @@ def ended_tasks(self):
def executable_tasks(self):
now = datetime.utcnow()
# we need both lists since retrying task are in the executing task list.
for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
if all([task.is_waiting(),
task.due_at <= now,
all(dependency in self._executed_tasks for dependency in task.dependencies)
Expand Down
10 changes: 9 additions & 1 deletion aria/orchestrator/workflows/core/events_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def _task_succeeded(ctx, *args, **kwargs):
with ctx.persist_changes:
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
ctx.task.attempts_count += 1

_update_node_state_if_necessary(ctx)

Expand Down Expand Up @@ -119,7 +120,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):


@events.on_resume_workflow_signal.connect
def _workflow_resume(workflow_context, *args, **kwargs):
def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
Expand All @@ -128,6 +129,13 @@ def _workflow_resume(workflow_context, *args, **kwargs):
if not task.has_ended():
task.status = task.PENDING

if retry_failed:
for task in execution.tasks:
if task.status == task.FAILED and not task.ignore_failure:
task.attempts_count = 0
task.status = task.PENDING



@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
Expand Down
3 changes: 1 addition & 2 deletions tests/modeling/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ def create_execution(status):
Execution.STARTED: [Execution.PENDING],
Execution.CANCELLING: [Execution.PENDING,
Execution.STARTED],
Execution.FAILED: [Execution.PENDING,
Execution.STARTED,
Execution.FAILED: [Execution.STARTED,
Execution.SUCCEEDED,
Execution.CANCELLED,
Execution.CANCELLING],
Expand Down
Loading

0 comments on commit 1b1e98d

Please sign in to comment.