diff --git a/simpleflow/command.py b/simpleflow/command.py index 2b03b1b63..3220f0063 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -407,7 +407,7 @@ def start_worker(unused_workflow, domain, task_list, log_level, nb_processes, he ) -def get_task_list(workflow_id=''): +def create_unique_task_list(workflow_id=''): task_list_id = '-' + uuid4().hex overflow = 256 - len(task_list_id) - len(workflow_id) if overflow < 0: @@ -522,7 +522,7 @@ def standalone(context, else: previous_history = None - task_list = get_task_list(workflow_id) + task_list = create_unique_task_list(workflow_id) logger.info('using task list {}'.format(task_list)) decider_proc = multiprocessing.Process( target=decider.command.start, diff --git a/simpleflow/swf/process/decider/base.py b/simpleflow/swf/process/decider/base.py index 6a27b7620..21d4db2eb 100644 --- a/simpleflow/swf/process/decider/base.py +++ b/simpleflow/swf/process/decider/base.py @@ -166,7 +166,7 @@ def decide(self, decision_response): :rtype: list[swf.models.decision.base.Decision] """ worker = DeciderWorker(self.domain, self._workflow_executors) - decisions = worker.decide(decision_response) + decisions = worker.decide(decision_response, self.task_list) return decisions @@ -183,12 +183,14 @@ def __init__(self, domain, workflow_executors): self._domain = domain self._workflow_executors = workflow_executors - def decide(self, decision_response): + def decide(self, decision_response, task_list): """ Delegate the decision to the executor, loading it if needed. :param decision_response: an object wrapping the PollForDecisionTask response. :type decision_response: swf.responses.Response + :param task_list: + :type task_list: Optional[str] :returns: the decisions. :rtype: list[swf.models.decision.base.Decision] @@ -202,6 +204,7 @@ def decide(self, decision_response): workflow_executor = helpers.load_workflow_executor( self._domain, workflow_name, + task_list=task_list, ) self._workflow_executors[workflow_name] = workflow_executor try: diff --git a/simpleflow/swf/process/worker/base.py b/simpleflow/swf/process/worker/base.py index 47467728a..c4765d57d 100644 --- a/simpleflow/swf/process/worker/base.py +++ b/simpleflow/swf/process/worker/base.py @@ -33,7 +33,7 @@ class ActivityPoller(Poller, swf.actors.ActivityWorker): Polls an activity and handles it in the worker. """ - def __init__(self, domain, task_list, heartbeat=60, *args, **kwargs): + def __init__(self, domain, task_list, heartbeat=60): """ :param domain: @@ -42,10 +42,6 @@ def __init__(self, domain, task_list, heartbeat=60, *args, **kwargs): :type task_list: :param heartbeat: :type heartbeat: - :param args: - :type args: - :param kwargs: - :type kwargs: """ self.nb_retries = 3 # heartbeat=0 is a special value to disable heartbeating. We want to