Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-236 Resumable workflow executions
  • Loading branch information
mxmrlv committed Jun 22, 2017
1 parent a751934 commit 75112ab052c7de7162901a7a46b5e843316cc63d
Showing 13 changed files with 282 additions and 47 deletions.
@@ -134,18 +134,63 @@ def start(workflow_name,
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor

workflow_runner = \
WorkflowRunner(workflow_name, service.id, inputs,
model_storage, resource_storage, plugin_manager,
executor, task_max_attempts, task_retry_interval)
WorkflowRunner(
model_storage, resource_storage, plugin_manager,
service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
)
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))

_run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)


execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
@executions.command(name='resume',
short_help='Resume a workflow')
@aria.argument('execution-id')
@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
@aria.options.dry_execution
@aria.options.task_max_attempts()
@aria.options.task_retry_interval()
@aria.options.mark_pattern()
@aria.options.verbose()
@aria.pass_model_storage
@aria.pass_resource_storage
@aria.pass_plugin_manager
@aria.pass_logger
def resume(execution_id,
dry,
task_max_attempts,
task_retry_interval,
mark_pattern,
model_storage,
resource_storage,
plugin_manager,
logger):
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor

workflow_runner = \
WorkflowRunner(
model_storage, resource_storage, plugin_manager,
execution_id=execution_id, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
)

logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
_run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)


def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
workflow_runner.execution.workflow_name)
execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
name=execution_thread_name)

logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()

log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id)
last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0
log_iterator = cli_logger.ModelLogIterator(model_storage,
workflow_runner.execution_id,
offset=last_task_id)
try:
while execution_thread.is_alive():
execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
@@ -115,8 +115,8 @@ def _configure_loggers(self, config):

class ModelLogIterator(object):

def __init__(self, model_storage, execution_id, filters=None, sort=None):
self._last_visited_id = 0
def __init__(self, model_storage, execution_id, filters=None, sort=None, offset=0):
self._last_visited_id = offset
self._model_storage = model_storage
self._execution_id = execution_id
self._additional_filters = filters or {}
@@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin):
VALID_TRANSITIONS = {
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES
CANCELLING: END_STATES,
CANCELLED: PENDING
}

@orm.validates('status')
@@ -97,10 +97,15 @@ def nodes(self):

@property
def _graph(self):
# Constructing a graph with only not ended nodes
if self._execution_graph is None:
graph = DiGraph()
for task in self.execution.tasks:
if task.has_ended():
continue
for dependency in task.dependencies:
if dependency.has_ended():
continue
graph.add_edge(dependency, task)

self._execution_graph = graph
@@ -34,3 +34,4 @@
on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
on_success_workflow_signal = signal('on_success_workflow_signal')
on_failure_workflow_signal = signal('on_failure_workflow_signal')
on_resume_workflow_signal = signal('on_resume_workflow_signal')
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
Raised when attempting to import a workflow's code but the implementation is not found
"""
pass


class InvalidWorkflowRunnerParams(AriaError):
"""
Raised when invalid combination of arguments is passed to the workflow runner
"""
pass
@@ -37,9 +37,9 @@

class WorkflowRunner(object):

def __init__(self, workflow_name, service_id, inputs,
model_storage, resource_storage, plugin_manager,
executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
def __init__(self, model_storage, resource_storage, plugin_manager,
execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
@@ -55,28 +55,36 @@ def __init__(self, workflow_name, service_id, inputs,
:param task_retry_interval: Retry interval in between retry attempts of a failing task
"""

if not (execution_id or (workflow_name and service_id)):
exceptions.InvalidWorkflowRunnerParams(
"Either provide execution id in order to resume a workflow or workflow name "
"and service id with inputs")

self._is_resume = execution_id is not None

self._model_storage = model_storage
self._resource_storage = resource_storage
self._workflow_name = workflow_name

# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
self._service_id = service_id

self._validate_workflow_exists_for_service()

workflow_fn = self._get_workflow_fn()

execution = self._create_execution_model(inputs)
self._execution_id = execution.id
if self._is_resume:
self._execution_id = execution_id
self._service_id = self.execution.service.id
self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
else:
self._service_id = service_id
self._workflow_name = workflow_name
self._validate_workflow_exists_for_service()
self._execution_id = self._create_execution_model(inputs).id

self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
service_id=service_id,
execution_id=execution.id,
workflow_name=workflow_name,
execution_id=self._execution_id,
workflow_name=self._workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)

@@ -86,9 +94,10 @@ def __init__(self, workflow_name, service_id, inputs,
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())

self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
compile.create_execution_tasks(
self._workflow_context, self._tasks_graph, executor.__class__)
if not self._is_resume:
workflow_fn = self._get_workflow_fn()
tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)

self._engine = engine.Engine(executors={executor.__class__: executor})

@@ -105,7 +114,7 @@ def service(self):
return self._model_storage.service.get(self._service_id)

def execute(self):
self._engine.execute(ctx=self._workflow_context)
self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)

def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
@@ -41,11 +41,15 @@ def __init__(self, executors, **kwargs):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())

def execute(self, ctx):
def execute(self, ctx, resuming=False):
"""
execute the workflow
"""
executing_tasks = []

if resuming:
events.on_resume_workflow_signal.send(ctx)

try:
events.start_workflow_signal.send(ctx)
while True:
@@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
execution.ended_at = datetime.utcnow()


@events.on_resume_workflow_signal.connect
def _workflow_resume(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING


@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from . import models, context, topology, operations
from . import models, context, topology, operations, workflow
@@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name,
)


def create_interface(service, interface_name, operation_name, operation_kwargs=None,
interface_kwargs=None):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')

def create_operation(operation_name, operation_kwargs=None):
if operation_kwargs and operation_kwargs.get('arguments'):
operation_kwargs['arguments'] = dict(
(argument_name, models.Argument.wrap(argument_name, argument_value))
for argument_name, argument_value in operation_kwargs['arguments'].iteritems()
if argument_value is not None)

operation = models.Operation(
return models.Operation(
name=operation_name,
**(operation_kwargs or {})
)


def create_interface(service, interface_name, operation_name, operation_kwargs=None,
interface_kwargs=None):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')
operation = create_operation(operation_name, operation_kwargs)

return models.Interface(
type=the_type,
operations=_dictify(operation),
@@ -314,7 +314,7 @@ def create_execution(status):
Execution.CANCELLING],
Execution.FAILED: [Execution.FAILED],
Execution.SUCCEEDED: [Execution.SUCCEEDED],
Execution.CANCELLED: [Execution.CANCELLED]
Execution.CANCELLED: [Execution.CANCELLED, Execution.PENDING]
}

invalid_transitions = {
@@ -334,8 +334,7 @@ def create_execution(status):
Execution.FAILED,
Execution.CANCELLED,
Execution.CANCELLING],
Execution.CANCELLED: [Execution.PENDING,
Execution.STARTED,
Execution.CANCELLED: [Execution.STARTED,
Execution.FAILED,
Execution.SUCCEEDED,
Execution.CANCELLING],

0 comments on commit 75112ab

Please sign in to comment.