Skip to content

Commit

Permalink
Merge pull request #2233 from StackStorm/retry_policy_improvements
Browse files Browse the repository at this point in the history
Retry policy improvements
  • Loading branch information
Kami committed Nov 24, 2015
2 parents 070f264 + 1fc8c07 commit 9f0a5fa
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 50 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ in development
* Fix policy trigger for action execution cancellation. (bug fix)
* Improve error reporting for static error in ActionChain definition e.g. incorrect reference
in default etc. (improvement)
* Fix action chain so it doesn't end up in an infinite loop if an action which is part of the chain
is canceled. (bug fix)

1.1.1 - November 13, 2015
-------------------------
Expand Down
11 changes: 10 additions & 1 deletion docs/source/upgrade_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ Upgrade Notes
Keep in mind that this condition is only met if action runner is running as root and / or if
action runner is running a system user (stanley) and a different user is requested when running
a command using ``user`` parameter.
* Support of default values is added to the API model. As a result, input parameters defined in
* Support of default values is added to the API model. As a result, input parameters defined in
the action metadata that is type of string no longer supports None or null.
* New ``timeout`` action execution status has been introduced. This status is a special type of
a failure and implies an action timeout.

All the existing runners (local, remote, python, http, action chain) have been updated to utilize
this new status when applicable. Previously, if an action timed out, status was set to ``failed``
and the timeout could only be inferred from the error message in the result object.

If you have code which checks for an action failure you need to update it to also check for
``timeout`` in addition to ``failed`` status.

|st2| 1.1
---------
Expand Down
6 changes: 3 additions & 3 deletions st2actions/st2actions/container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
except:
pass

action_completed = status in action_constants.COMPLETED_STATES
action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
if isinstance(runner, AsyncActionRunner) and not action_completed:
self._setup_async_query(liveaction_db.id, runnertype_db, context)
except:
Expand Down Expand Up @@ -151,7 +151,7 @@ def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
# actions in the workflow. If the auth token is deleted here, then the actions
# in the workflow will fail with unauthorized exception.
is_async_runner = isinstance(runner, AsyncActionRunner)
action_completed = status in action_constants.COMPLETED_STATES
action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES

if not is_async_runner or (is_async_runner and action_completed):
try:
Expand Down Expand Up @@ -194,7 +194,7 @@ def _update_live_action_db(self, liveaction_id, status, result, context):
Update LiveActionDB object for the provided liveaction id.
"""
liveaction_db = get_liveaction_by_id(liveaction_id)
if status in action_constants.COMPLETED_STATES:
if status in action_constants.LIVEACTION_COMPLETED_STATES:
end_timestamp = date_utils.get_datetime_utc_now()
else:
end_timestamp = None
Expand Down
16 changes: 6 additions & 10 deletions st2actions/st2actions/notifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from oslo_config import cfg

from st2common import log as logging
from st2common.constants import action as action_constants
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
from st2common.constants.action import LIVEACTION_FAILED_STATES
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
from st2common.models.api.trace import TraceContext
from st2common.models.db.liveaction import LiveActionDB
Expand All @@ -42,12 +44,6 @@

ACTIONUPDATE_WORK_Q = liveaction.get_queue('st2.notifiers.work',
routing_key=publishers.UPDATE_RK)
ACTION_COMPLETE_STATES = [
action_constants.LIVEACTION_STATUS_SUCCEEDED,
action_constants.LIVEACTION_STATUS_FAILED,
action_constants.LIVEACTION_STATUS_TIMED_OUT,
action_constants.LIVEACTION_STATUS_CANCELED
]

ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
# XXX: Fix this nasty positional dependency.
Expand All @@ -73,7 +69,7 @@ def process(self, liveaction):
extra = {'live_action_db': liveaction}
LOG.debug('Processing liveaction %s', live_action_id, extra=extra)

if liveaction.status not in ACTION_COMPLETE_STATES:
if liveaction.status not in LIVEACTION_COMPLETED_STATES:
LOG.debug('Skipping processing of liveaction %s since it\'s not in a completed state' %
(live_action_id), extra=extra)
return
Expand Down Expand Up @@ -111,12 +107,12 @@ def _post_notify_triggers(self, liveaction=None, execution_id=None):
liveaction=liveaction, execution_id=execution_id,
notify_subsection=notify.on_complete,
default_message_suffix='completed.')
if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
self._post_notify_subsection_triggers(
liveaction=liveaction, execution_id=execution_id,
notify_subsection=notify.on_success,
default_message_suffix='succeeded.')
if liveaction.status == action_constants.LIVEACTION_STATUS_FAILED and notify.on_failure:
if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure:
self._post_notify_subsection_triggers(
liveaction=liveaction, execution_id=execution_id,
notify_subsection=notify.on_failure,
Expand Down
4 changes: 2 additions & 2 deletions st2actions/st2actions/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _query_and_save_results(self, query_context):
self._delete_state_object(query_context)
return

if status in action_constants.COMPLETED_STATES:
if status in action_constants.LIVEACTION_COMPLETED_STATES:
action_db = get_action_by_ref(liveaction_db.action)
if not action_db:
LOG.exception('Unable to invoke post run. Action %s '
Expand All @@ -128,7 +128,7 @@ def _update_action_results(self, execution_id, status, results):
liveaction_db.result = results

# Action has completed, record end_timestamp
if (liveaction_db.status in action_constants.COMPLETED_STATES and
if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
not liveaction_db.end_timestamp):
liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()

Expand Down
83 changes: 55 additions & 28 deletions st2actions/st2actions/runners/actionchainrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
from st2actions.runners import ActionRunner
from st2common import log as logging
from st2common.constants.action import ACTION_KV_PREFIX
from st2common.constants.action import (LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_FAILED)
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
from st2common.constants.action import LIVEACTION_STATUS_FAILED
from st2common.constants.action import LIVEACTION_STATUS_CANCELED
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
from st2common.constants.action import LIVEACTION_FAILED_STATES
from st2common.constants.system import SYSTEM_KV_PREFIX
from st2common.content.loader import MetaLoader
from st2common.exceptions.action import (ParameterRenderingFailedException,
Expand Down Expand Up @@ -291,6 +295,7 @@ def run(self, action_parameters):

while action_node:
fail = False
timeout = False
error = None
liveaction = None

Expand Down Expand Up @@ -366,36 +371,55 @@ def run(self, action_parameters):
self._stopped = action_service.is_action_canceled_or_canceling(
self.liveaction_id)

if not self._stopped:
try:
if not liveaction or liveaction.status == LIVEACTION_STATUS_FAILED:
fail = True
action_node = self.chain_holder.get_next_node(action_node.name,
condition='on-failure')
elif liveaction.status == LIVEACTION_STATUS_SUCCEEDED:
action_node = self.chain_holder.get_next_node(action_node.name,
condition='on-success')
except Exception as e:
LOG.exception('Failed to get next node "%s".', action_node.name)
if self._stopped:
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
status = LIVEACTION_STATUS_CANCELED
return (status, result, None)

try:
if not liveaction:
fail = True
error = ('Failed to get next node "%s". Lookup failed: %s' %
(action_node.name, str(e)))
trace = traceback.format_exc(10)
top_level_error = {
'error': error,
'traceback': trace
}
# reset action_node here so that chain breaks on failure.
action_node = None
break
else:
action_node = self.chain_holder.get_next_node(action_node.name,
condition='on-failure')
elif liveaction.status in LIVEACTION_FAILED_STATES:
if liveaction and liveaction.status == LIVEACTION_STATUS_TIMED_OUT:
timeout = True
else:
fail = True
action_node = self.chain_holder.get_next_node(action_node.name,
condition='on-failure')
elif liveaction.status == LIVEACTION_STATUS_CANCELED:
# User canceled an action (task) in the workflow - cancel the execution of
# rest of the workflow
self._stopped = True
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
elif liveaction.status == LIVEACTION_STATUS_SUCCEEDED:
action_node = self.chain_holder.get_next_node(action_node.name,
condition='on-success')
except Exception as e:
LOG.exception('Failed to get next node "%s".', action_node.name)

fail = True
error = ('Failed to get next node "%s". Lookup failed: %s' %
(action_node.name, str(e)))
trace = traceback.format_exc(10)
top_level_error = {
'error': error,
'traceback': trace
}
# reset action_node here so that chain breaks on failure.
action_node = None
break

if self._stopped:
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
status = LIVEACTION_STATUS_CANCELED
return (status, result, None)

if fail:
status = LIVEACTION_STATUS_FAILED
elif timeout:
status = LIVEACTION_STATUS_TIMED_OUT
else:
status = LIVEACTION_STATUS_SUCCEEDED

Expand Down Expand Up @@ -486,18 +510,21 @@ def _get_next_action(self, action_node, parent_context, action_params, context_r

return liveaction

def _run_action(self, liveaction, wait_for_completion=True):
def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
"""
:param sleep_delay: Number of seconds to wait during "is completed" polls.
:type sleep_delay: ``float``
"""
try:
# request return canceled
liveaction, _ = action_service.request(liveaction)
except Exception as e:
liveaction.status = LIVEACTION_STATUS_FAILED
LOG.exception('Failed to schedule liveaction.')
raise e

while (wait_for_completion and
liveaction.status != LIVEACTION_STATUS_SUCCEEDED and
liveaction.status != LIVEACTION_STATUS_FAILED):
eventlet.sleep(1)
while (wait_for_completion and liveaction.status not in LIVEACTION_COMPLETED_STATES):
eventlet.sleep(sleep_delay)
liveaction = action_db_util.get_liveaction_by_id(liveaction.id)

return liveaction
Expand Down
74 changes: 74 additions & 0 deletions st2actions/tests/unit/test_actionchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from st2common.exceptions import actionrunner as runnerexceptions
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
from st2common.constants.action import LIVEACTION_STATUS_CANCELED
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
from st2common.constants.action import LIVEACTION_STATUS_FAILED
from st2common.models.api.notification import NotificationsHelper
from st2common.models.db.liveaction import LiveActionDB
Expand Down Expand Up @@ -57,6 +59,8 @@ def __init__(self, status=LIVEACTION_STATUS_SUCCEEDED, result=''):

CHAIN_1_PATH = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain1.yaml')
CHAIN_2_PATH = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain2.yaml')
CHAIN_ACTION_CALL_NO_PARAMS_PATH = FixturesLoader().get_fixture_file_path_abs(
FIXTURES_PACK, 'actionchains', 'chain_action_call_no_params.yaml')
CHAIN_NO_DEFAULT = FixturesLoader().get_fixture_file_path_abs(
Expand Down Expand Up @@ -140,6 +144,76 @@ def test_chain_runner_success_path(self, request):
# based on the chain the callcount is known to be 3. Not great but works.
self.assertEqual(request.call_count, 3)

@mock.patch.object(action_db_util, 'get_action_by_ref',
mock.MagicMock(return_value=ACTION_1))
@mock.patch.object(action_service, 'request', return_value=(DummyActionExecution(), None))
def test_chain_runner_chain_second_task_times_out(self, request):
# Second task in the chain times out so the action chain status should be timeout
chain_runner = acr.get_runner()
chain_runner.entry_point = CHAIN_2_PATH
chain_runner.action = ACTION_1

original_run_action = chain_runner._run_action

def mock_run_action(*args, **kwargs):
original_live_action = args[0]
liveaction = original_run_action(*args, **kwargs)
if original_live_action.action == 'wolfpack.a2':
# Mock a timeout for second task
liveaction.status = LIVEACTION_STATUS_TIMED_OUT
return liveaction

chain_runner._run_action = mock_run_action

action_ref = ResourceReference.to_string_reference(name=ACTION_1.name,
pack=ACTION_1.pack)
chain_runner.liveaction = LiveActionDB(action=action_ref)
chain_runner.container_service = RunnerContainerService()
chain_runner.pre_run()
status, _, _ = chain_runner.run({})

self.assertEqual(status, LIVEACTION_STATUS_TIMED_OUT)
self.assertNotEqual(chain_runner.chain_holder.actionchain, None)
# based on the chain the callcount is known to be 3. Not great but works.
self.assertEqual(request.call_count, 3)

@mock.patch.object(action_db_util, 'get_action_by_ref',
mock.MagicMock(return_value=ACTION_1))
@mock.patch.object(action_service, 'request', return_value=(DummyActionExecution(), None))
def test_chain_runner_task_is_canceled_while_running(self, request):
# Second task in the action is CANCELED, make sure runner doesn't get stuck in an infinite
# loop
chain_runner = acr.get_runner()
chain_runner.entry_point = CHAIN_2_PATH
chain_runner.action = ACTION_1

original_run_action = chain_runner._run_action

def mock_run_action(*args, **kwargs):
original_live_action = args[0]
if original_live_action.action == 'wolfpack.a2':
status = LIVEACTION_STATUS_CANCELED
else:
status = LIVEACTION_STATUS_SUCCEEDED
request.return_value = (DummyActionExecution(status=status), None)
liveaction = original_run_action(*args, **kwargs)
return liveaction

chain_runner._run_action = mock_run_action

action_ref = ResourceReference.to_string_reference(name=ACTION_1.name,
pack=ACTION_1.pack)
chain_runner.liveaction = LiveActionDB(action=action_ref)
chain_runner.container_service = RunnerContainerService()
chain_runner.pre_run()
status, _, _ = chain_runner.run({})

self.assertEqual(status, LIVEACTION_STATUS_CANCELED)
self.assertNotEqual(chain_runner.chain_holder.actionchain, None)
# Chain count should be 2 since the last task doesn't get called since the second one was
# canceled
self.assertEqual(request.call_count, 2)

@mock.patch.object(action_db_util, 'get_action_by_ref',
mock.MagicMock(return_value=ACTION_1))
@mock.patch.object(action_service, 'request', return_value=(DummyActionExecution(), None))
Expand Down
4 changes: 2 additions & 2 deletions st2api/st2api/controllers/v1/actionexecutions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from st2api.controllers.v1.executionviews import SUPPORTED_FILTERS
from st2common import log as logging
from st2common.constants.action import LIVEACTION_STATUS_CANCELED
from st2common.constants.action import CANCELABLE_STATES
from st2common.constants.action import LIVEACTION_CANCELABLE_STATES
from st2common.exceptions.trace import TraceNotFoundException
from st2common.models.api.action import LiveActionAPI
from st2common.models.api.base import jsexpose
Expand Down Expand Up @@ -381,7 +381,7 @@ def delete(self, exec_id):
if liveaction_db.status == LIVEACTION_STATUS_CANCELED:
abort(http_client.OK, 'Action is already in "canceled" state.')

if liveaction_db.status not in CANCELABLE_STATES:
if liveaction_db.status not in LIVEACTION_CANCELABLE_STATES:
abort(http_client.OK, 'Action cannot be canceled. State = %s.' % liveaction_db.status)

try:
Expand Down

0 comments on commit 9f0a5fa

Please sign in to comment.