Skip to content

Commit

Permalink
Merge 6ae2495 into 48ae45e
Browse files Browse the repository at this point in the history
  • Loading branch information
m4dcoder committed Jan 13, 2016
2 parents 48ae45e + 6ae2495 commit 3e1618a
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 40 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ in development
an invalid type or similar. (improvement)
* Use ``--always-copy`` option when creating virtualenv for packs from packs.setup_virtualenv
action. This is required when st2actionrunner is kicked off from python within a virtualenv.
* Fix runaway action triggers caused by state miscalculation for mistral workflow. (bug fix)
* The ``--tasks`` option in the CLI for ``st2 execution get`` and ``st2 run`` will be renamed to
``--show-tasks`` to avoid conflict with the tasks option in st2 execution re-run.
* Add option to rerun one or more tasks in mistral workflow that has errored. (new-feature)

1.2.0 - December 07, 2015
-------------------------
Expand Down
7 changes: 7 additions & 0 deletions contrib/examples/actions/mistral-test-rerun.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
name: mistral-test-rerun
description: A sample workflow used to test the rerun feature.
pack: examples
runner_type: mistral-v2
entry_point: workflows/mistral-test-rerun.yaml
enabled: true
10 changes: 10 additions & 0 deletions contrib/examples/actions/workflows/mistral-test-rerun.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '2.0'

examples.mistral-test-rerun:
description: A sample workflow used to test the rerun feature.
type: direct
tasks:
task1:
action: core.local
input:
cmd: "exit `st2 key get mistral-test-rerun-switch | grep value | awk '{print $4}'`"
2 changes: 1 addition & 1 deletion pylint_plugins/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

# A list of class names for which we want to skip the checks
CLASS_NAME_BLACKLIST = [
'ExecutionParametersAPI'
'ExecutionSpecificationAPI'
]


Expand Down
8 changes: 8 additions & 0 deletions st2actions/st2actions/container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@

class RunnerContainer(object):

def _get_rerun_reference(self, context):
execution_id = context.get('re-run', {}).get('ref')
return ActionExecution.get_by_id(execution_id) if execution_id else None

def _get_runner(self, runnertype_db, action_db, liveaction_db):
runner = get_runner(runnertype_db.runner_module)

Expand All @@ -63,6 +67,10 @@ def _get_runner(self, runnertype_db, action_db, liveaction_db):
runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
action_db.entry_point)

# For re-run, get the ActionExecutionDB in which the re-run is based on.
rerun_ref_id = runner.context.get('re-run', {}).get('ref')
runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None

return runner

def dispatch(self, liveaction_db):
Expand Down
1 change: 1 addition & 0 deletions st2actions/st2actions/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, runner_id):
self.context = None
self.callback = None
self.auth_token = None
self.rerun_ex_ref = None

@abc.abstractmethod
def pre_run(self):
Expand Down
86 changes: 73 additions & 13 deletions st2actions/st2actions/runners/mistral/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,7 @@ def _find_default_workflow(self, def_dict):
else:
raise Exception('There are no workflows in the workbook.')

@retrying.retry(
retry_on_exception=utils.retry_on_exceptions,
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
def run(self, action_parameters):
# Test connection
self._client.workflows.list()

# Setup inputs for the workflow execution.
inputs = self.runner_parameters.get('context', dict())
inputs.update(action_parameters)

def _construct_workflow_execution_options(self):
# This URL is used by Mistral to talk back to the API
api_url = get_mistral_api_url()
endpoint = api_url + '/actionexecutions'
Expand Down Expand Up @@ -192,6 +180,30 @@ def run(self, action_parameters):
}
}

return options

def _get_resume_options(self):
return self.context.get('re-run', {})

@retrying.retry(
retry_on_exception=utils.retry_on_exceptions,
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
def run(self, action_parameters):
resume_options = self._get_resume_options()
tasks = resume_options.get('tasks', [])
resume = self.rerun_ex_ref and tasks
return self.resume(self.rerun_ex_ref, tasks) if resume else self.start(action_parameters)

def start(self, action_parameters):
# Test connection
self._client.workflows.list()

# Setup inputs for the workflow execution.
inputs = self.runner_parameters.get('context', dict())
inputs.update(action_parameters)

# Get workbook/workflow definition from file.
with open(self.entry_point, 'r') as def_file:
def_yaml = def_file.read()
Expand All @@ -210,6 +222,9 @@ def run(self, action_parameters):
def_dict_xformed = utils.transform_definition(def_dict)
def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)

# Construct additional options for the workflow execution
options = self._construct_workflow_execution_options()

# Save workbook/workflow definition.
if is_workbook:
self._save_workbook(action_ref, def_yaml_xformed)
Expand Down Expand Up @@ -238,6 +253,51 @@ def run(self, action_parameters):

return (status, partial_results, exec_context)

def resume(self, ex_ref, task_names):
mistral_ctx = ex_ref.context.get('mistral', dict())

if not mistral_ctx.get('execution_id'):
raise Exception('Unable to rerun because mistral execution_id is missing.')

execution = self._client.executions.get(mistral_ctx.get('execution_id'))

# pylint: disable=no-member
if execution.state not in ['ERROR']:
raise Exception('Workflow execution is not in a rerunable state.')

# pylint: disable=no-member
tasks = {task.name: task.to_dict()
for task in self._client.tasks.list(workflow_execution_id=execution.id)
if task.name in task_names and task.state == 'ERROR'}

missing_tasks = list(set(task_names) - set(tasks.keys()))
if missing_tasks:
raise Exception('Only tasks in error state can be rerun. Unable to identify '
'rerunable tasks: %s. Please make sure that the task name is correct '
'and the task is in rerunable state.' % ', '.join(missing_tasks))

# Construct additional options for the workflow execution
options = self._construct_workflow_execution_options()

for task in tasks.values():
# pylint: disable=unexpected-keyword-arg
self._client.tasks.rerun(task['id'], env=options.get('env', None))

status = LIVEACTION_STATUS_RUNNING
partial_results = {'tasks': []}

# pylint: disable=no-member
current_context = {
'execution_id': str(execution.id),
'workflow_name': execution.workflow_name
}

exec_context = self.context
exec_context = self._build_mistral_context(exec_context, current_context)
LOG.info('Mistral query context is %s' % exec_context)

return (status, partial_results, exec_context)

@retrying.retry(
retry_on_exception=utils.retry_on_exceptions,
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
Expand Down

0 comments on commit 3e1618a

Please sign in to comment.