diff --git a/swf/actors/decider.py b/swf/actors/decider.py index cbd0669..e3ece8d 100644 --- a/swf/actors/decider.py +++ b/swf/actors/decider.py @@ -44,8 +44,9 @@ def complete(self, task_token, def poll(self, task_list=None, identity=None, **kwargs): - """Polls for decision tasks to process from current - actor's instance defined ``task_list`` + """ + Polls a decision task and returns the token and the full history of the + workflow's events. :param task_list: task list to poll for decision tasks from. :type task_list: string @@ -55,22 +56,41 @@ def poll(self, task_list=None, workflow history. :type identity: string - :returns: polled decision tasks + :returns: (token, history) :type: swf.models.History + """ task_list = task_list or self.task_list - events = self.connection.poll_for_decision_task( + task = self.connection.poll_for_decision_task( self.domain.name, task_list=task_list, identity=identity, **kwargs ) - - if not 'taskToken' in events: + token = task.get('taskToken') + if token is None: raise PollTimeout("Decider poll timed out") - history = History.from_event_list(events['events']) - task_token = events['taskToken'] + events = task['events'] + + next_page = task.get('nextPageToken') + while next_page: + task = self.connection.poll_for_decision_task( + self.domain.name, + task_list=task_list, + identity=identity, + next_page_token=next_page, + **kwargs + ) + + token = task.get('taskToken') + if token is None: + raise PollTimeout("Decider poll timed out") + + events.extend(task['events']) + next_page = task.get('nextPageToken') + + history = History.from_event_list(events) - return task_token, history + return token, history diff --git a/tests/actors/test_decider.py b/tests/actors/test_decider.py new file mode 100644 index 0000000..7d00219 --- /dev/null +++ b/tests/actors/test_decider.py @@ -0,0 +1,54 @@ +import unittest + +import swf.models +import swf.models.decision +import swf.actors + + +class TestDecider(unittest.TestCase): + def setUp(self): + self.domain = swf.models.Domain("TestDomain") + self.task_list = 'test' + self.execution = None + + def tearDown(self): + if self.execution is not None: + self.execution.terminate() + + def test_poll(self): + """ + Checks :meth:`Decider.poll` retrieve all the history's pages. + + """ + domain = self.domain + task_list = self.task_list + workflow_name = 'TestDeciderPoll' + + decider = swf.actors.Decider(domain, task_list) + worker = swf.actors.ActivityWorker(domain, task_list) + + activity = swf.models.ActivityType(domain=domain, + name='task', + version='test') + + workflow = swf.models.WorkflowType(name=workflow_name, + domain=domain, + version='test') + self.execution = workflow.start_execution(workflow_name, task_list) + + for i in xrange(30): + token, history = decider.poll() + self.assertEqual(len(history), 3 + i * 6) + decision = swf.models.decision.task.ActivityTaskDecision( + 'schedule', + 'task', + activity, + task_list=task_list, + task_timeout='600', + duration_timeout='600', + schedule_timeout='600', + heartbeat_timeout='600') + decider.complete(token, [decision]) + + token, task = worker.poll() + worker.complete(token)