diff --git a/cadence/activity.py b/cadence/activity.py index b77f0f9..e47d29f 100644 --- a/cadence/activity.py +++ b/cadence/activity.py @@ -139,7 +139,7 @@ def complete_exceptionally(service, task_token, ex: Exception) -> Optional[Excep respond: RespondActivityTaskFailedRequest = RespondActivityTaskFailedRequest() respond.task_token = task_token respond.identity = WorkflowService.get_identity() - respond.reason = "ActivityFailureException" + respond.reason = type(ex).__name__ respond.details = serialize_exception(ex) _, error = service.respond_activity_task_failed(respond) return error diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 7c6f261..7b8482f 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -37,7 +37,15 @@ def activity_task_loop(worker: Worker): except StopRequestedException: return except Exception as ex: + if f'{ex}' == "timeout": + logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") + continue logger.error("PollForActivityTask error: %s", ex) + try: + service.close() + except: + logger.warning("service.close() failed", exc_info=1) + worker.notify_thread_stopped() raise if err: logger.error("PollForActivityTask failed: %s", err) diff --git a/cadence/connection.py b/cadence/connection.py index 973e89c..c1256c0 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -180,7 +180,7 @@ def create(cls, service: str, method_name: str, thrift_payload: bytes): # PollForActivityTask is hardcoded on the server to timeout at # 60 seconds, so the ttl needs to be slightly more so that it # does not fail. - o.ttl = 61000 + o.ttl = 140000 return o @staticmethod diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index afdf0d5..1b56bb1 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -880,9 +880,11 @@ def run(self): finally: # noinspection PyPep8,PyBroadException try: + logger.info(f"closing wf service") self.service.close() except: logger.warning("service.close() failed", exc_info=1) + logger.info(f"notifying worker thread stopped") self.worker.notify_thread_stopped() def poll(self) -> Optional[PollForDecisionTaskResponse]: @@ -899,11 +901,17 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: polling_end = datetime.datetime.now() logger.debug("PollForDecisionTask: %dms", (polling_end - polling_start).total_seconds() * 1000) except TChannelException as ex: + if f'{ex}' == "timeout": + logger.warning("PollForDecisionTask error: %s", ex) + logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") + return None logger.error("PollForDecisionTask error: %s", ex) - return None + logger.info(f"raising exception PollForDecisionTask {ex}") + raise if err: logger.error("PollForDecisionTask failed: %s", err) - return None + raise + # return None if not task.task_token: logger.debug("PollForActivityTask has no task token (expected): %s", task) return None diff --git a/cadence/workerfactory.py b/cadence/workerfactory.py index 90b0ac8..dd9340d 100644 --- a/cadence/workerfactory.py +++ b/cadence/workerfactory.py @@ -1,6 +1,7 @@ -from dataclasses import dataclass, field from typing import List +from dataclasses import dataclass, field + from cadence.worker import Worker, WorkerOptions @@ -17,8 +18,9 @@ class WorkerFactory: options: WorkerFactoryOptions = None workers: List[Worker] = field(default_factory=list) - def new_worker(self, task_list: str, worker_options: WorkerOptions = None) -> Worker: - worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, options=worker_options) + def new_worker(self, task_list: str, timeout: int = 120, worker_options: WorkerOptions = None) -> Worker: + worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, timeout=timeout, + options=worker_options) self.workers.append(worker) return worker