Addition of an object-oriented Layer2 to Amazon's boto.swf #1021

Merged
merged 2 commits into from Dec 14, 2012

Conversation

Projects
None yet
2 participants
@oozie
Contributor

oozie commented Sep 26, 2012

This piece of code wraps entirely around boto.swf.layer1 and boto.swf.layer1_decisions and provides a fully OO interface to Amazon's SWF.
The code takes advantage of object's shared state to save re-entering relevant information (domain, last task token, connection, etc). As a result, writing all sorts of workflows becomes extremely simple and takes just a few lines of code.

The following code bootstraps a test domain and implements two workflows: one for parallel and one for serial task execution.

#!/usr/bin/python

import boto.swf.layer2 as swf
import time

ACCESS = <ACCESS KEY>
SECRET = <SECRET KEY>
DOMAIN = 'botoTest'
WORKFLOW = DOMAIN + 'Workflow'
TASKLIST = DOMAIN + 'TaskList'
ACTIVITY = DOMAIN + 'Activity'
VERSION = 'v1'

def setup_swf():
    swf.set_default_credentials(ACCESS, SECRET) 
    try:
        print 'Registering domain', DOMAIN
        swf.Domain(name=DOMAIN).register()
    except Exception, e:
        print e
    try:
        print 'Registering activity', ACTIVITY
        swf.ActivityType(domain=DOMAIN, name=ACTIVITY, version=VERSION,
                         task_list=TASKLIST).register()
    except Exception, e:
        print e
    try:
        print 'Registering workflow', WORKFLOW
        swf.WorkflowType(domain=DOMAIN, name=WORKFLOW, version=VERSION,
                         task_list=TASKLIST).register()
    except Exception, e:
        print e

class TestDecider(swf.Decider):

    SCHED_COUNT = 5

    def run_parallel(self):
        decision_task = self.poll()
        if 'events' in decision_task:
            decisions = swf.Layer1Decisions()
            # Decision* events are irrelevant here and can be ignored.
            workflow_events = [e for e in decision_task['events'] 
                               if not e['eventType'].startswith('Decision')]
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At start, kickoff SCHED_COUNT activities in parallel.
                for i in range(TestDecider.SCHED_COUNT):
                    decisions.schedule_activity_task('activity%i' % i, ACTIVITY, VERSION, 
                                                     task_list=TASKLIST)
            elif last_event_type == 'ActivityTaskCompleted':
                # Monitor progress. When all activities complete, complete workflow.
                completed_count = sum([1 for a in decision_task['events']
                                       if a['eventType'] == 'ActivityTaskCompleted'])
                print '%i/%i' % (completed_count, TestDecider.SCHED_COUNT)
                if completed_count == TestDecider.SCHED_COUNT:
                    decisions.complete_workflow_execution()
            self.complete(decisions=decisions)
            return True

    def run_serial(self):
        decision_task = self.poll()
        if 'events' in decision_task:
            decisions = swf.Layer1Decisions()
            completed_count = sum([1 for a in decision_task['events']
                                   if a['eventType'] == 'ActivityTaskCompleted'])
            print '%i/%i' % (completed_count, TestDecider.SCHED_COUNT)
            if completed_count == TestDecider.SCHED_COUNT:
                decisions.complete_workflow_execution()
            else:
                decisions.schedule_activity_task('activity%i' % completed_count, 
                                                 ACTIVITY, VERSION, 
                                                 task_list=TASKLIST)
            self.complete(decisions=decisions)
            return True

class TestWorker(swf.ActivityWorker):

    def run(self):
        """Report current time."""
        activity_task = self.poll()
        if 'activityId' in activity_task:
            self.complete(result=str(time.time()))
            return True

Copy+paste it into a `layer2test.py' and execute it in an interactive interpreter to bootstrap the domain, kickoff a workflow execution and launch an activity worker

$ python -i layer2test.py
>>> setup_swf()
Registering domain botoTest
Registering activity botoTestActivity
Registering workflow botoTestWorkflow
>>> wkf = swf.WorkflowType(domain=DOMAIN, name=WORKFLOW, version=VERSION, task_list=TASKLIST)
>>> wkf.start() # Kicks off workflow execution
<WorkflowExecution 'botoTestWorkflow-v1' at 0x918ecac>
>>> wrk = TestWorker(domain=DOMAIN, task_list=TASKLIST)
>>> while wrk.run(): pass  # Launches an activity worker

Now you'll need at least one more python interpreter to run the decider.

$ python -i layer2test.py
>>> swf.set_default_credentials(ACCESS, SECRET)
>>> dec = TestDecider(domain=DOMAIN, task_list=TASKLIST)

Now to run the workflow in parallel

>>> while dec.run_parallel(): pass # Runs activities in parallel.

The same procedure ending with dec.run_serial() has the activty worker(s) execute tasks serially.

In both cases the decider will print progress and hang on for 1 minute (long poll) after the last decision before it closes the connection to SWF.

@garnaat

This comment has been minimized.

Show comment Hide comment
@garnaat

garnaat Nov 14, 2012

Owner

We haven't forgotten about this. It looks good. I just need a bit more time to evaluate the proposed Layer2.

Owner

garnaat commented Nov 14, 2012

We haven't forgotten about this. It looks good. I just need a bit more time to evaluate the proposed Layer2.

@oozie

This comment has been minimized.

Show comment Hide comment
@oozie

oozie Dec 14, 2012

Contributor

Cool. I'm committing to writing a concise tutorial with a few recipes, should the code make it in.

Contributor

oozie commented Dec 14, 2012

Cool. I'm committing to writing a concise tutorial with a few recipes, should the code make it in.

garnaat added a commit that referenced this pull request Dec 14, 2012

Merge pull request #1021 from oozie/layer2
Addition of an object-oriented Layer2 to Amazon's boto.swf

@garnaat garnaat merged commit b1dfd48 into boto:develop Dec 14, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment