@@ -46,28 +46,34 @@
# Some API calls establish resources, but these resources are not instantly
# available to the next API call. For testing purposes, it is necessary to
# have a short pause to avoid having tests fail for invalid reasons.
PAUSE_SECONDS = 2
PAUSE_SECONDS = 4



class SimpleWorkflowLayer1Test (unittest.TestCase):

class SimpleWorkflowLayer1TestBase(unittest.TestCase):
"""
There are at least two test cases which share this setUp/tearDown
and the class-based parameter definitions:
* SimpleWorkflowLayer1Test
* tests.swf.test_layer1_workflow_execution.SwfL1WorkflowExecutionTest
"""
# Some params used throughout the tests...
# Domain registration params...
_domain = BOTO_SWF_UNITTEST_DOMAIN
_workflow_execution_retention_period_in_days = 'NONE'
_domain_description = 'test workflow domain'
# Type registration params used for workflow type and activity type...
_version = '1'
_task_list = 'tasklist1'
# Workflow type registration params...
_workflow_type_name = 'wft1'
_workflow_type_name = 'wft1'
_workflow_type_version = '1'
_workflow_type_description = 'wft1 description'
_default_child_policy = 'REQUEST_CANCEL'
_default_execution_start_to_close_timeout = '600'
_default_task_start_to_close_timeout = '60'
# Activity type registration params...
_activity_type_name = 'at1'
_activity_type_version = '1'
_activity_type_description = 'at1 description'
_default_task_heartbeat_timeout = '30'
_default_task_schedule_to_close_timeout = '90'
@@ -95,7 +101,7 @@ def setUp(self):
# SWFTypeAlreadyExistsError.
try:
r = self.conn.register_workflow_type(self._domain,
self._workflow_type_name, self._version,
self._workflow_type_name, self._workflow_type_version,
task_list=self._task_list,
default_child_policy=self._default_child_policy,
default_execution_start_to_close_timeout=
@@ -112,7 +118,7 @@ def setUp(self):
# SWFTypeAlreadyExistsError.
try:
r = self.conn.register_activity_type(self._domain,
self._activity_type_name, self._version,
self._activity_type_name, self._activity_type_version,
task_list=self._task_list,
default_task_heartbeat_timeout=
self._default_task_heartbeat_timeout,
@@ -123,13 +129,20 @@ def setUp(self):
default_task_start_to_close_timeout=
self._default_task_start_to_close_timeout,
description=self._activity_type_description)
assert r is None
time.sleep(PAUSE_SECONDS)
except swf_exceptions.SWFTypeAlreadyExistsError:
pass

def tearDown(self):
# Delete what we can...
pass




class SimpleWorkflowLayer1Test(SimpleWorkflowLayer1TestBase):

def test_list_domains(self):
# Find the domain.
r = self.conn.list_domains('REGISTERED')
@@ -151,7 +164,7 @@ def test_list_workflow_types(self):
found = None
for info in r['typeInfos']:
if ( info['workflowType']['name'] == self._workflow_type_name and
info['workflowType']['version'] == self._version ):
info['workflowType']['version'] == self._workflow_type_version ):
found = info
break
self.assertNotEqual(found, None, 'list_workflow_types; test type not found')
@@ -169,7 +182,7 @@ def test_list_activity_types(self):
if info['activityType']['name'] == self._activity_type_name:
found = info
break
self.assertNotEqual(found, None, 'list_workflow_types; test type not found')
self.assertNotEqual(found, None, 'list_activity_types; test type not found')
# Validate some properties.
self.assertEqual(found['description'], self._activity_type_description,
'list_activity_types; description does not match')
@@ -0,0 +1,171 @@
"""
Tests for Layer1 of Simple Workflow
"""
import time
import uuid
import json
import traceback

from boto.swf.layer1_decisions import Layer1Decisions

from test_layer1 import SimpleWorkflowLayer1TestBase



class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase):
"""
test a simple workflow execution
"""
def run_decider(self):
"""
run one iteration of a simple decision engine
"""
# Poll for a decision task.
tries = 0
while 1:
dtask = self.conn.poll_for_decision_task(self._domain,
self._task_list, reverse_order=True)
if dtask.get('taskToken') is not None:
# This means a real decision task has arrived.
break
time.sleep(2)
tries += 1
if tries > 10:
# Give up if it's taking too long. Probably
# means something is broken somewhere else.
assert False, 'no decision task occurred'

# Get the most recent interesting event.
ignorable = (
'DecisionTaskScheduled',
'DecisionTaskStarted',
'DecisionTaskTimedOut',
)
event = None
for tevent in dtask['events']:
if tevent['eventType'] not in ignorable:
event = tevent
break

# Construct the decision response.
decisions = Layer1Decisions()
if event['eventType'] == 'WorkflowExecutionStarted':
activity_id = str(uuid.uuid1())
decisions.schedule_activity_task(activity_id,
self._activity_type_name, self._activity_type_version,
task_list=self._task_list,
input=event['workflowExecutionStartedEventAttributes']['input'])
elif event['eventType'] == 'ActivityTaskCompleted':
decisions.complete_workflow_execution(
result=event['activityTaskCompletedEventAttributes']['result'])
elif event['eventType'] == 'ActivityTaskFailed':
decisions.fail_workflow_execution(
reason=event['activityTaskFailedEventAttributes']['reason'],
details=event['activityTaskFailedEventAttributes']['details'])
else:
decisions.fail_workflow_execution(
reason='unhandled decision task type; %r' % (event['eventType'],))

# Send the decision response.
r = self.conn.respond_decision_task_completed(dtask['taskToken'],
decisions=decisions._data,
execution_context=None)
assert r is None


def run_worker(self):
"""
run one iteration of a simple worker engine
"""
# Poll for an activity task.
tries = 0
while 1:
atask = self.conn.poll_for_activity_task(self._domain,
self._task_list, identity='test worker')
if atask.get('activityId') is not None:
# This means a real activity task has arrived.
break
time.sleep(2)
tries += 1
if tries > 10:
# Give up if it's taking too long. Probably
# means something is broken somewhere else.
assert False, 'no activity task occurred'
# Do the work or catch a "work exception."
reason = None
try:
result = json.dumps(sum(json.loads(atask['input'])))
except:
reason = 'an exception was raised'
details = traceback.format_exc()
if reason is None:
r = self.conn.respond_activity_task_completed(
atask['taskToken'], result)
else:
r = self.conn.respond_activity_task_failed(
atask['taskToken'], reason=reason, details=details)
assert r is None


def test_workflow_execution(self):
# Start a workflow execution whose activity task will succeed.
workflow_id = 'wfid-%.2f' % (time.time(),)
r = self.conn.start_workflow_execution(self._domain,
workflow_id,
self._workflow_type_name,
self._workflow_type_version,
execution_start_to_close_timeout='20',
input='[600, 15]')
# Need the run_id to lookup the execution history later.
run_id = r['runId']

# Move the workflow execution forward by having the
# decider schedule an activity task.
self.run_decider()

# Run the worker to handle the scheduled activity task.
self.run_worker()

# Complete the workflow execution by having the
# decider close it down.
self.run_decider()

# Check that the result was stored in the execution history.
r = self.conn.get_workflow_execution_history(self._domain,
run_id, workflow_id,
reverse_order=True)['events'][0]
result = r['workflowExecutionCompletedEventAttributes']['result']
assert json.loads(result) == 615


def test_failed_workflow_execution(self):
# Start a workflow execution whose activity task will fail.
workflow_id = 'wfid-%.2f' % (time.time(),)
r = self.conn.start_workflow_execution(self._domain,
workflow_id,
self._workflow_type_name,
self._workflow_type_version,
execution_start_to_close_timeout='20',
input='[600, "s"]')
# Need the run_id to lookup the execution history later.
run_id = r['runId']

# Move the workflow execution forward by having the
# decider schedule an activity task.
self.run_decider()

# Run the worker to handle the scheduled activity task.
self.run_worker()

# Complete the workflow execution by having the
# decider close it down.
self.run_decider()

# Check that the failure was stored in the execution history.
r = self.conn.get_workflow_execution_history(self._domain,
run_id, workflow_id,
reverse_order=True)['events'][0]
reason = r['workflowExecutionFailedEventAttributes']['reason']
assert reason == 'an exception was raised'

@@ -52,6 +52,7 @@
from dynamodb.test_layer2 import DynamoDBLayer2Test
from sts.test_session_token import SessionTokenTest
from swf.test_layer1 import SimpleWorkflowLayer1Test
from swf.test_layer1_workflow_execution import SwfL1WorkflowExecutionTest

def usage():
print "test.py [-t testsuite] [-v verbosity]"
@@ -138,8 +139,10 @@ def suite(testsuite="all"):
tests.addTest(unittest.makeSuite(SessionTokenTest))
elif testsuite == "swf":
tests.addTest(unittest.makeSuite(SimpleWorkflowLayer1Test))
tests.addTest(unittest.makeSuite(SwfL1WorkflowExecutionTest))
elif testsuite == "swfL1":
tests.addTest(unittest.makeSuite(SimpleWorkflowLayer1Test))
tests.addTest(unittest.makeSuite(SwfL1WorkflowExecutionTest))
else:
raise ValueError("Invalid choice.")
return tests