From 7119d38a2fe3fbe8df45103a6a30c0c4aef27b90 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 9 Oct 2020 15:36:11 +0400 Subject: [PATCH 01/11] fix activity failure reason --- cadence/activity.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/activity.py b/cadence/activity.py index b77f0f9..e47d29f 100644 --- a/cadence/activity.py +++ b/cadence/activity.py @@ -139,7 +139,7 @@ def complete_exceptionally(service, task_token, ex: Exception) -> Optional[Excep respond: RespondActivityTaskFailedRequest = RespondActivityTaskFailedRequest() respond.task_token = task_token respond.identity = WorkflowService.get_identity() - respond.reason = "ActivityFailureException" + respond.reason = type(ex).__name__ respond.details = serialize_exception(ex) _, error = service.respond_activity_task_failed(respond) return error From 61b9c42a42ca1e4b42d75858f07620ae589825e2 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 9 Oct 2020 16:20:55 +0400 Subject: [PATCH 02/11] add optional parameter timeout in worker --- cadence/workerfactory.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cadence/workerfactory.py b/cadence/workerfactory.py index 90b0ac8..dd9340d 100644 --- a/cadence/workerfactory.py +++ b/cadence/workerfactory.py @@ -1,6 +1,7 @@ -from dataclasses import dataclass, field from typing import List +from dataclasses import dataclass, field + from cadence.worker import Worker, WorkerOptions @@ -17,8 +18,9 @@ class WorkerFactory: options: WorkerFactoryOptions = None workers: List[Worker] = field(default_factory=list) - def new_worker(self, task_list: str, worker_options: WorkerOptions = None) -> Worker: - worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, options=worker_options) + def new_worker(self, task_list: str, timeout: int = 120, worker_options: WorkerOptions = None) -> Worker: + worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, timeout=timeout, + options=worker_options) self.workers.append(worker) return worker From 8b9823c3bab4ea743888351914ce1bf02d3fb173 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 9 Oct 2020 20:20:15 +0400 Subject: [PATCH 03/11] close service --- cadence/activity_loop.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 7c6f261..8f33333 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,6 +38,11 @@ def activity_task_loop(worker: Worker): return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) + try: + service.close() + except: + logger.warning("service.close() failed", exc_info=1) + worker.notify_thread_stopped() raise if err: logger.error("PollForActivityTask failed: %s", err) From 142966db83cd84a0982cb769891134ec3e077d89 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 23 Oct 2020 11:13:04 +0400 Subject: [PATCH 04/11] raise in case of poll decision task error --- cadence/decision_loop.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index afdf0d5..b8c5b8d 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -900,10 +900,11 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: logger.debug("PollForDecisionTask: %dms", (polling_end - polling_start).total_seconds() * 1000) except TChannelException as ex: logger.error("PollForDecisionTask error: %s", ex) - return None + raise if err: logger.error("PollForDecisionTask failed: %s", err) - return None + raise + # return None if not task.task_token: logger.debug("PollForActivityTask has no task token (expected): %s", task) return None From 0a85e77f6452b303136f88dbab5ad35fbd474a27 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 23 Oct 2020 11:54:02 +0400 Subject: [PATCH 05/11] logging --- cadence/decision_loop.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index b8c5b8d..36016c7 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -880,9 +880,11 @@ def run(self): finally: # noinspection PyPep8,PyBroadException try: + logger.info(f"closing wf service") self.service.close() except: logger.warning("service.close() failed", exc_info=1) + logger.info(f"notifying worker thread stopped") self.worker.notify_thread_stopped() def poll(self) -> Optional[PollForDecisionTaskResponse]: @@ -900,6 +902,7 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: logger.debug("PollForDecisionTask: %dms", (polling_end - polling_start).total_seconds() * 1000) except TChannelException as ex: logger.error("PollForDecisionTask error: %s", ex) + logger.info(f"raising exception PollForDecisionTask {ex}") raise if err: logger.error("PollForDecisionTask failed: %s", err) From dd4dc5004473032d9c85b3829f44e60a876bf87f Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Wed, 11 Nov 2020 14:39:42 +0400 Subject: [PATCH 06/11] increase ttl --- cadence/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/connection.py b/cadence/connection.py index 973e89c..f16eb3d 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -180,7 +180,7 @@ def create(cls, service: str, method_name: str, thrift_payload: bytes): # PollForActivityTask is hardcoded on the server to timeout at # 60 seconds, so the ttl needs to be slightly more so that it # does not fail. - o.ttl = 61000 + o.ttl = 100000 return o @staticmethod From 98120539dc86bcbd3a1b69e44b24e14ab52ee8e4 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Wed, 11 Nov 2020 14:50:10 +0400 Subject: [PATCH 07/11] increase ttl --- cadence/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/connection.py b/cadence/connection.py index f16eb3d..c1256c0 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -180,7 +180,7 @@ def create(cls, service: str, method_name: str, thrift_payload: bytes): # PollForActivityTask is hardcoded on the server to timeout at # 60 seconds, so the ttl needs to be slightly more so that it # does not fail. - o.ttl = 100000 + o.ttl = 140000 return o @staticmethod From 9c7015f7901515136ab64e53b644ae0f4729a85f Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 12 Nov 2020 13:55:18 +0400 Subject: [PATCH 08/11] logging --- cadence/activity_loop.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 8f33333..fb22dd4 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -37,6 +37,8 @@ def activity_task_loop(worker: Worker): except StopRequestedException: return except Exception as ex: + if f'{ex}' == "timeout": + logger.info(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") logger.error("PollForActivityTask error: %s", ex) try: service.close() From b09ee2dab472bb5c89367697faf28a02c12a0031 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 12 Nov 2020 13:57:51 +0400 Subject: [PATCH 09/11] logging --- cadence/activity_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index fb22dd4..3d20e5e 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,7 +38,7 @@ def activity_task_loop(worker: Worker): return except Exception as ex: if f'{ex}' == "timeout": - logger.info(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") + logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") logger.error("PollForActivityTask error: %s", ex) try: service.close() From 66b25b13cf97f981884832977d8a97adcb188439 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 13 Nov 2020 00:20:07 +0400 Subject: [PATCH 10/11] do not raise in case of long poll timeout --- cadence/activity_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 3d20e5e..7b8482f 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -39,6 +39,7 @@ def activity_task_loop(worker: Worker): except Exception as ex: if f'{ex}' == "timeout": logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") + continue logger.error("PollForActivityTask error: %s", ex) try: service.close() From df73b72c74f9954620919bf6424e62e7ed6b1fb1 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Fri, 13 Nov 2020 00:25:27 +0400 Subject: [PATCH 11/11] do not raise in case of long poll timeout --- cadence/decision_loop.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 36016c7..1b56bb1 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -901,6 +901,10 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: polling_end = datetime.datetime.now() logger.debug("PollForDecisionTask: %dms", (polling_end - polling_start).total_seconds() * 1000) except TChannelException as ex: + if f'{ex}' == "timeout": + logger.warning("PollForDecisionTask error: %s", ex) + logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}") + return None logger.error("PollForDecisionTask error: %s", ex) logger.info(f"raising exception PollForDecisionTask {ex}") raise