Skip to content
This repository was archived by the owner on Apr 12, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions swf/actors/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def complete(self, task_token,
def poll(self, task_list=None,
identity=None,
**kwargs):
"""Polls for decision tasks to process from current
actor's instance defined ``task_list``
"""
Polls a decision task and returns the token and the full history of the
workflow's events.

:param task_list: task list to poll for decision tasks from.
:type task_list: string
Expand All @@ -55,22 +56,41 @@ def poll(self, task_list=None,
workflow history.
:type identity: string

:returns: polled decision tasks
:returns: (token, history)
:type: swf.models.History

"""
task_list = task_list or self.task_list

events = self.connection.poll_for_decision_task(
task = self.connection.poll_for_decision_task(
self.domain.name,
task_list=task_list,
identity=identity,
**kwargs
)

if not 'taskToken' in events:
token = task.get('taskToken')
if token is None:
raise PollTimeout("Decider poll timed out")

history = History.from_event_list(events['events'])
task_token = events['taskToken']
events = task['events']

next_page = task.get('nextPageToken')
while next_page:
task = self.connection.poll_for_decision_task(
self.domain.name,
task_list=task_list,
identity=identity,
next_page_token=next_page,
**kwargs
)

token = task.get('taskToken')
if token is None:
raise PollTimeout("Decider poll timed out")

events.extend(task['events'])
next_page = task.get('nextPageToken')

history = History.from_event_list(events)

return task_token, history
return token, history
54 changes: 54 additions & 0 deletions tests/actors/test_decider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import unittest

import swf.models
import swf.models.decision
import swf.actors


class TestDecider(unittest.TestCase):
def setUp(self):
self.domain = swf.models.Domain("TestDomain")
self.task_list = 'test'
self.execution = None

def tearDown(self):
if self.execution is not None:
self.execution.terminate()

def test_poll(self):
"""
Checks :meth:`Decider.poll` retrieve all the history's pages.

"""
domain = self.domain
task_list = self.task_list
workflow_name = 'TestDeciderPoll'

decider = swf.actors.Decider(domain, task_list)
worker = swf.actors.ActivityWorker(domain, task_list)

activity = swf.models.ActivityType(domain=domain,
name='task',
version='test')

workflow = swf.models.WorkflowType(name=workflow_name,
domain=domain,
version='test')
self.execution = workflow.start_execution(workflow_name, task_list)

for i in xrange(30):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is slow because is calls the SWF API several times.

token, history = decider.poll()
self.assertEqual(len(history), 3 + i * 6)
decision = swf.models.decision.task.ActivityTaskDecision(
'schedule',
'task',
activity,
task_list=task_list,
task_timeout='600',
duration_timeout='600',
schedule_timeout='600',
heartbeat_timeout='600')
decider.complete(token, [decision])

token, task = worker.poll()
worker.complete(token)