Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def start_worker(unused_workflow, domain, task_list, log_level, nb_processes, he
)


def get_task_list(workflow_id=''):
def create_unique_task_list(workflow_id=''):
task_list_id = '-' + uuid4().hex
overflow = 256 - len(task_list_id) - len(workflow_id)
if overflow < 0:
Expand Down Expand Up @@ -522,7 +522,7 @@ def standalone(context,
else:
previous_history = None

task_list = get_task_list(workflow_id)
task_list = create_unique_task_list(workflow_id)
logger.info('using task list {}'.format(task_list))
decider_proc = multiprocessing.Process(
target=decider.command.start,
Expand Down
7 changes: 5 additions & 2 deletions simpleflow/swf/process/decider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def decide(self, decision_response):
:rtype: list[swf.models.decision.base.Decision]
"""
worker = DeciderWorker(self.domain, self._workflow_executors)
decisions = worker.decide(decision_response)
decisions = worker.decide(decision_response, self.task_list)
return decisions


Expand All @@ -183,12 +183,14 @@ def __init__(self, domain, workflow_executors):
self._domain = domain
self._workflow_executors = workflow_executors

def decide(self, decision_response):
def decide(self, decision_response, task_list):
"""
Delegate the decision to the executor, loading it if needed.

:param decision_response: an object wrapping the PollForDecisionTask response.
:type decision_response: swf.responses.Response
:param task_list:
:type task_list: Optional[str]

:returns: the decisions.
:rtype: list[swf.models.decision.base.Decision]
Expand All @@ -202,6 +204,7 @@ def decide(self, decision_response):
workflow_executor = helpers.load_workflow_executor(
self._domain,
workflow_name,
task_list=task_list,
)
self._workflow_executors[workflow_name] = workflow_executor
try:
Expand Down
6 changes: 1 addition & 5 deletions simpleflow/swf/process/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ActivityPoller(Poller, swf.actors.ActivityWorker):
Polls an activity and handles it in the worker.

"""
def __init__(self, domain, task_list, heartbeat=60, *args, **kwargs):
def __init__(self, domain, task_list, heartbeat=60):
"""

:param domain:
Expand All @@ -42,10 +42,6 @@ def __init__(self, domain, task_list, heartbeat=60, *args, **kwargs):
:type task_list:
:param heartbeat:
:type heartbeat:
:param args:
:type args:
:param kwargs:
:type kwargs:
"""
self.nb_retries = 3
# heartbeat=0 is a special value to disable heartbeating. We want to
Expand Down