From 9c04a952dba1198e7b94beb99d6dcef3e0653ed3 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Fri, 3 Feb 2017 18:05:43 +0100 Subject: [PATCH 1/5] Poller._poll: use utils.retry.with_delay Partial fix to getting throttled by the SWF API. _complete should already retry (thrice, is this enough?) Signed-off-by: Yves Bastide --- simpleflow/swf/process/poller.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 1d91df076..9b9acac33 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -147,19 +147,11 @@ def _poll(self): identity = self.identity logger.debug("polling task on %s", task_list) - try: - response = self.poll( - task_list, - identity=identity, - ) - except swf.exceptions.PollTimeout: - logger.debug('{}: PollTimeout'.format(self)) - raise - except Exception as err: - logger.error( - "exception %s when polling on %s", - str(err), - task_list, - ) - raise + poll = utils.retry.with_delay( + nb_times=self.nb_retries, + delay=utils.retry.exponential, + log_with=logger.exception, + on_exceptions=swf.exceptions.ResponseError, + )(self.poll) + response = poll(task_list, identity=identity) return response From 4cf3713ac85bab21329bca4d3b723b20f28b490a Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Fri, 3 Feb 2017 18:12:49 +0100 Subject: [PATCH 2/5] Poller: rename _poll and _complete These methods are not much private: let's rename them to *_with_retry. Signed-off-by: Yves Bastide --- simpleflow/swf/process/decider/base.py | 2 +- simpleflow/swf/process/poller.py | 10 +++++----- simpleflow/swf/process/worker/base.py | 2 +- tests/test_simpleflow/swf/process/test_poller.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/simpleflow/swf/process/decider/base.py b/simpleflow/swf/process/decider/base.py index 21d4db2eb..275915acc 100644 --- a/simpleflow/swf/process/decider/base.py +++ b/simpleflow/swf/process/decider/base.py @@ -149,7 +149,7 @@ def process(self, decision_response): try: logger.info('completing decision for workflow {}'.format( self._workflow_name)) - self._complete(decision_response.token, decisions) + self.complete_with_retry(decision_response.token, decisions) except Exception as err: logger.error('cannot complete decision: {}'.format(err)) diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 9b9acac33..2a0a493a2 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -9,10 +9,8 @@ from simpleflow.process import NamedMixin, with_state from simpleflow.swf.helpers import swf_identity - logger = logging.getLogger(__name__) - __all__ = ['Poller'] @@ -20,6 +18,7 @@ class Poller(swf.actors.Actor, NamedMixin): """Multi-processing implementation of a SWF actor. """ + def __init__(self, domain, task_list=None): self.is_alive = False self._named_mixin_properties = ["task_list"] @@ -52,6 +51,7 @@ def bind_signal_handlers(self): - SIGTERM and SIGINT lead to a graceful shutdown - other signals are not modified for now """ + # NB: Function is nested to have a reference to *self*. def _handle_graceful_shutdown(signum, frame): logger.info("process: caught signal signal=SIGTERM pid={}".format(os.getpid())) @@ -74,7 +74,7 @@ def start(self): self.set_process_name() while self.is_alive: try: - response = self._poll() + response = self.poll_with_retry() except swf.exceptions.PollTimeout: continue self.process(response) @@ -87,7 +87,7 @@ def stop_gracefully(self): logger.info('stopping %s', self.name) self.is_alive = False # No longer take requests. - def _complete(self, token, response): + def complete_with_retry(self, token, response): """ Complete with retry. :param token: @@ -131,7 +131,7 @@ def name(self): """ return '{}()'.format(self.__class__.__name__) - def _poll(self): + def poll_with_retry(self): """ Polls a task represented by its token and data. It uses long-polling with a timeout of one minute. diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index c4765d57d..f113bb8c2 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -145,7 +145,7 @@ def process(self, poller, token, task): return poller.fail(token, task, reason=str(err), details=tb) try: - poller._complete(token, json_dumps(result)) + poller.complete_with_retry(token, json_dumps(result)) except Exception as err: logger.exception("complete error") reason = 'cannot complete task {}: {}'.format( diff --git a/tests/test_simpleflow/swf/process/test_poller.py b/tests/test_simpleflow/swf/process/test_poller.py index 14f01d2e4..6424749f1 100644 --- a/tests/test_simpleflow/swf/process/test_poller.py +++ b/tests/test_simpleflow/swf/process/test_poller.py @@ -17,7 +17,7 @@ class FakePoller(Poller): """ This poller only waits 2 seconds then exits. """ - def _poll(self): + def poll_with_retry(self): # NB: time.sleep gets interrupted by any signal, so the following lines # are not actually as dumb as they seem to be... time.sleep(1) From 248dae73597c8b045efd78ba8e61945638c6db7f Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Fri, 3 Feb 2017 18:17:15 +0100 Subject: [PATCH 3/5] Docstring fixes Signed-off-by: Yves Bastide --- simpleflow/utils/retry.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/simpleflow/utils/retry.py b/simpleflow/utils/retry.py index 267ed556e..c057ee829 100644 --- a/simpleflow/utils/retry.py +++ b/simpleflow/utils/retry.py @@ -42,10 +42,12 @@ def with_delay( :type delay: callable(value: int) -> int :param on_exceptions: retry only when these exceptions raise. - :type on_exceptions: Sequence([Exception]) + :type on_exceptions: Exception | Sequence([Exception]) :param except_on: don't retry on these exceptions. :type except_on: Sequence([Exception]) + + :param log_with: logger instance to use. """ if log_with is None: log_with = logging.getLogger(__name__).info From 5205d8a2513e52cb26bedab8c9f12c70b37db9c0 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Fri, 3 Feb 2017 18:34:24 +0100 Subject: [PATCH 4/5] Remove obsolete FIXME comment Signed-off-by: Yves Bastide --- simpleflow/swf/process/poller.py | 1 - 1 file changed, 1 deletion(-) diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 2a0a493a2..4f46b05f5 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -97,7 +97,6 @@ def complete_with_retry(self, token, response): :return: :rtype: """ - # FIXME this is a public member try: complete = utils.retry.with_delay( nb_times=self.nb_retries, From 34b759e1c9019a8025d53c9c78d9fba250ce988f Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Fri, 3 Feb 2017 19:20:36 +0100 Subject: [PATCH 5/5] Add retry on activity worker's fail Totally untested! Signed-off-by: Yves Bastide --- simpleflow/swf/process/poller.py | 15 +++++++++++++++ simpleflow/swf/process/worker/base.py | 6 +++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/simpleflow/swf/process/poller.py b/simpleflow/swf/process/poller.py index 4f46b05f5..7b69129d4 100644 --- a/simpleflow/swf/process/poller.py +++ b/simpleflow/swf/process/poller.py @@ -154,3 +154,18 @@ def poll_with_retry(self): )(self.poll) response = poll(task_list, identity=identity) return response + + @abc.abstractmethod + def fail(self, *args, **kwargs): + """fail; only relevant for activity workers.""" + raise NotImplementedError + + def fail_with_retry(self, *args, **kwargs): + fail = utils.retry.with_delay( + nb_times=self.nb_retries, + delay=utils.retry.exponential, + log_with=logger.exception, + on_exceptions=swf.exceptions.ResponseError, + )(self.fail) + response = fail(*args, **kwargs) + return response diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index f113bb8c2..86a3ae961 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -142,7 +142,7 @@ def process(self, poller, token, task): except Exception as err: logger.exception("process error: {}".format(str(err))) tb = traceback.format_exc() - return poller.fail(token, task, reason=str(err), details=tb) + return poller.fail_with_retry(token, task, reason=str(err), details=tb) try: poller.complete_with_retry(token, json_dumps(result)) @@ -152,7 +152,7 @@ def process(self, poller, token, task): task.activity_id, err, ) - poller.fail(token, task, reason) + poller.fail_with_retry(token, task, reason) def process_task(poller, token, task): @@ -204,7 +204,7 @@ def worker_alive(): worker.pid )) if worker.exitcode != 0: - poller.fail( + poller.fail_with_retry( token, task, reason='process {} died: exit code {}'.format(