Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simpleflow/swf/process/decider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def process(self, decision_response):
try:
logger.info('completing decision for workflow {}'.format(
self._workflow_name))
self._complete(decision_response.token, decisions)
self.complete_with_retry(decision_response.token, decisions)
except Exception as err:
logger.error('cannot complete decision: {}'.format(err))

Expand Down
48 changes: 27 additions & 21 deletions simpleflow/swf/process/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@
from simpleflow.process import NamedMixin, with_state
from simpleflow.swf.helpers import swf_identity


logger = logging.getLogger(__name__)


__all__ = ['Poller']


class Poller(swf.actors.Actor, NamedMixin):
"""Multi-processing implementation of a SWF actor.

"""

def __init__(self, domain, task_list=None):
self.is_alive = False
self._named_mixin_properties = ["task_list"]
Expand Down Expand Up @@ -52,6 +51,7 @@ def bind_signal_handlers(self):
- SIGTERM and SIGINT lead to a graceful shutdown
- other signals are not modified for now
"""

# NB: Function is nested to have a reference to *self*.
def _handle_graceful_shutdown(signum, frame):
logger.info("process: caught signal signal=SIGTERM pid={}".format(os.getpid()))
Expand All @@ -74,7 +74,7 @@ def start(self):
self.set_process_name()
while self.is_alive:
try:
response = self._poll()
response = self.poll_with_retry()
except swf.exceptions.PollTimeout:
continue
self.process(response)
Expand All @@ -87,7 +87,7 @@ def stop_gracefully(self):
logger.info('stopping %s', self.name)
self.is_alive = False # No longer take requests.

def _complete(self, token, response):
def complete_with_retry(self, token, response):
"""
Complete with retry.
:param token:
Expand All @@ -97,7 +97,6 @@ def _complete(self, token, response):
:return:
:rtype:
"""
# FIXME this is a public member
try:
complete = utils.retry.with_delay(
nb_times=self.nb_retries,
Expand Down Expand Up @@ -131,7 +130,7 @@ def name(self):
"""
return '{}()'.format(self.__class__.__name__)

def _poll(self):
def poll_with_retry(self):
"""
Polls a task represented by its token and data. It uses long-polling
with a timeout of one minute.
Expand All @@ -147,19 +146,26 @@ def _poll(self):
identity = self.identity

logger.debug("polling task on %s", task_list)
try:
response = self.poll(
task_list,
identity=identity,
)
except swf.exceptions.PollTimeout:
logger.debug('{}: PollTimeout'.format(self))
raise
except Exception as err:
logger.error(
"exception %s when polling on %s",
str(err),
task_list,
)
raise
poll = utils.retry.with_delay(
nb_times=self.nb_retries,
delay=utils.retry.exponential,
log_with=logger.exception,
on_exceptions=swf.exceptions.ResponseError,
)(self.poll)
response = poll(task_list, identity=identity)
return response

@abc.abstractmethod
def fail(self, *args, **kwargs):
"""fail; only relevant for activity workers."""
raise NotImplementedError

def fail_with_retry(self, *args, **kwargs):
fail = utils.retry.with_delay(
nb_times=self.nb_retries,
delay=utils.retry.exponential,
log_with=logger.exception,
on_exceptions=swf.exceptions.ResponseError,
)(self.fail)
response = fail(*args, **kwargs)
return response
8 changes: 4 additions & 4 deletions simpleflow/swf/process/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,17 @@ def process(self, poller, token, task):
except Exception as err:
logger.exception("process error: {}".format(str(err)))
tb = traceback.format_exc()
return poller.fail(token, task, reason=str(err), details=tb)
return poller.fail_with_retry(token, task, reason=str(err), details=tb)

try:
poller._complete(token, json_dumps(result))
poller.complete_with_retry(token, json_dumps(result))
except Exception as err:
logger.exception("complete error")
reason = 'cannot complete task {}: {}'.format(
task.activity_id,
err,
)
poller.fail(token, task, reason)
poller.fail_with_retry(token, task, reason)


def process_task(poller, token, task):
Expand Down Expand Up @@ -204,7 +204,7 @@ def worker_alive():
worker.pid
))
if worker.exitcode != 0:
poller.fail(
poller.fail_with_retry(
token,
task,
reason='process {} died: exit code {}'.format(
Expand Down
4 changes: 3 additions & 1 deletion simpleflow/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ def with_delay(
:type delay: callable(value: int) -> int

:param on_exceptions: retry only when these exceptions raise.
:type on_exceptions: Sequence([Exception])
:type on_exceptions: Exception | Sequence([Exception])

:param except_on: don't retry on these exceptions.
:type except_on: Sequence([Exception])

:param log_with: logger instance to use.
"""
if log_with is None:
log_with = logging.getLogger(__name__).info
Expand Down
2 changes: 1 addition & 1 deletion tests/test_simpleflow/swf/process/test_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class FakePoller(Poller):
"""
This poller only waits 2 seconds then exits.
"""
def _poll(self):
def poll_with_retry(self):
# NB: time.sleep gets interrupted by any signal, so the following lines
# are not actually as dumb as they seem to be...
time.sleep(1)
Expand Down