Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cadence/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def complete_exceptionally(service, task_token, ex: Exception) -> Optional[Excep
respond: RespondActivityTaskFailedRequest = RespondActivityTaskFailedRequest()
respond.task_token = task_token
respond.identity = WorkflowService.get_identity()
respond.reason = "ActivityFailureException"
respond.reason = type(ex).__name__
respond.details = serialize_exception(ex)
_, error = service.respond_activity_task_failed(respond)
return error
Expand Down
8 changes: 8 additions & 0 deletions cadence/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ def activity_task_loop(worker: Worker):
except StopRequestedException:
return
except Exception as ex:
if f'{ex}' == "timeout":
logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}")
continue
logger.error("PollForActivityTask error: %s", ex)
try:
service.close()
except:
logger.warning("service.close() failed", exc_info=1)
worker.notify_thread_stopped()
raise
if err:
logger.error("PollForActivityTask failed: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion cadence/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def create(cls, service: str, method_name: str, thrift_payload: bytes):
# PollForActivityTask is hardcoded on the server to timeout at
# 60 seconds, so the ttl needs to be slightly more so that it
# does not fail.
o.ttl = 61000
o.ttl = 140000
return o

@staticmethod
Expand Down
12 changes: 10 additions & 2 deletions cadence/decision_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,11 @@ def run(self):
finally:
# noinspection PyPep8,PyBroadException
try:
logger.info(f"closing wf service")
self.service.close()
except:
logger.warning("service.close() failed", exc_info=1)
logger.info(f"notifying worker thread stopped")
self.worker.notify_thread_stopped()

def poll(self) -> Optional[PollForDecisionTaskResponse]:
Expand All @@ -899,11 +901,17 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]:
polling_end = datetime.datetime.now()
logger.debug("PollForDecisionTask: %dms", (polling_end - polling_start).total_seconds() * 1000)
except TChannelException as ex:
if f'{ex}' == "timeout":
logger.warning("PollForDecisionTask error: %s", ex)
logger.warning(f"LongPoll timeout -- no tasks available to execute -- exception message: {ex}")
return None
logger.error("PollForDecisionTask error: %s", ex)
return None
logger.info(f"raising exception PollForDecisionTask {ex}")
raise
if err:
logger.error("PollForDecisionTask failed: %s", err)
return None
raise
# return None
if not task.task_token:
logger.debug("PollForActivityTask has no task token (expected): %s", task)
return None
Expand Down
8 changes: 5 additions & 3 deletions cadence/workerfactory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass, field
from typing import List

from dataclasses import dataclass, field

from cadence.worker import Worker, WorkerOptions


Expand All @@ -17,8 +18,9 @@ class WorkerFactory:
options: WorkerFactoryOptions = None
workers: List[Worker] = field(default_factory=list)

def new_worker(self, task_list: str, worker_options: WorkerOptions = None) -> Worker:
worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, options=worker_options)
def new_worker(self, task_list: str, timeout: int = 120, worker_options: WorkerOptions = None) -> Worker:
worker = Worker(host=self.host, port=self.port, domain=self.domain, task_list=task_list, timeout=timeout,
options=worker_options)
self.workers.append(worker)
return worker

Expand Down