# Simple Workflow Flow Sample

This implements the sample workflow from the SWF tutorial

In [None]:
import boto3
from botocore.client import Config
import uuid
botoConfig = Config(connect_timeout=50, read_timeout=70)
client = boto3.client('swf', config=botoConfig)
sns = boto3.client('sns')

In [None]:
workflowDomain = 'TutorialWorkflowDomain2'
workflow = 'TutorialWorkflow'
tasklist = 'TutorialTasks'

# In this example I've defined and registered the activity types ahead of time. With SWF it is possible to
# receive an activity that has not been registerd with the domain - if appropriate they can be registered
# at that point. For this example we assume only the following activities in the domain.
activities = ['get_contact_activity','subscribe_topic_activity','wait_for_confirmation_activity',
             'send_result_activity']

In [None]:
############################################
## DEFINITION
############################################

In [None]:
# To me master of your domain, you must first register it.
response = client.register_domain(
    name=workflowDomain,
    description='Tutorial workflow domain',
    workflowExecutionRetentionPeriodInDays='1'
)

print response

In [None]:
response = client.register_workflow_type(
    domain=workflowDomain,
    name=workflow,
    version='1',
    description='Tutorial workflow',
    defaultTaskStartToCloseTimeout='NONE',
    defaultExecutionStartToCloseTimeout='900',
    defaultTaskList={
        'name': tasklist
    },
    defaultChildPolicy='TERMINATE'
)

print response

In [None]:


for activity in activities:
    response = client.register_activity_type(
        domain=workflowDomain,
        name=activity,
        version='1',
        defaultTaskList={
            'name': tasklist
        }
    )
    
    print response

In [None]:
############################################
## EXECUTION
############################################

In [None]:
def get_activity_type_name(event):
    name = 'not found'
    
    if event['eventType'] == 'ActivityTaskScheduled':
        name = event['activityTaskScheduledEventAttributes']['activityType']['name']
    
    return name

def get_result(event):
    result = ''
    
    if event['eventType'] == 'ActivityTaskCompleted':
        result = event['activityTaskCompletedEventAttributes']['result']
        
    return result


def get_email_from_task_completion(task):
    scheduled_event=-1
    started_event=-1
    contact_email = ''
    
    events = task['events']
    for e in events:
        if scheduled_event == -1 and e['eventType'] == 'ActivityTaskScheduled':
            activity_name = get_activity_type_name(e)
            scheduled_event=e['eventId']
        elif scheduled_event != -1 and started_event== -1 and e['eventType'] == 'ActivityTaskStarted':
            started_event = e['eventId']
        elif scheduled_event != -1 and started_event != -1 and e['eventType'] == 'ActivityTaskCompleted':
            contact_email = get_result(e)
        
            
    return contact_email
    
    

def get_input_for_activity(task, activity_name):
    if activity_name == 'subscribe_topic_activity':
        return get_email_from_task_completion(task)
    else:
        return 'i dunno'

In [None]:

def schedule_activity(task, activity_name, activity_id):
    task_input = get_input_for_activity(task, activity_name)
    
    response = client.respond_decision_task_completed(
        taskToken=task['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': activity_name,
                    'version': '1'
                    },
                'activityId': activity_id,
                'input': task_input,
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': tasklist},
            }
          }
        ]
      )

    print response

In [None]:
def complete_get_contact_activity(task_token):
    print 'get-contact-activity'
    contact_info = {}
    email = raw_input("Enter your email address: ")
    client.respond_activity_task_completed(
        taskToken=task_token,
        result=email
    )

    

In [None]:
def complete_subscribe_topic_activity(task):
    print task
    response = client.describe_workflow_execution(
        domain=workflowDomain,
        execution={
            'workflowId': task['workflowExecution']['workflowId'],
            'runId': task['workflowExecution']['runId']
        }
    )
    print response
    generic_complete_activity_ok(task['taskToken'])

In [None]:
def generic_complete_activity_ok(task_token):
    print 'generic-activity'
    client.respond_activity_task_completed(
        taskToken=task_token,
        result='success'
    )

In [None]:
def poll_and_complete_task():
    
    task = client.poll_for_activity_task(
        domain=workflowDomain,
        taskList={'name': tasklist},
        identity='worker-1'
    )

    if 'taskToken' not in task:
        print "Poll timed out, no new task.  Repoll"
        return
    else:
        print "New task arrived"
        
    task_token = task['taskToken']
    activity_name = task['activityType']['name']
    if activity_name == 'get_contact_activity':
        complete_get_contact_activity(task_token)
    elif activity_name == 'subscribe_topic_activity':
        complete_subscribe_topic_activity(task)
    else:
        generic_complete_activity_ok(task_token)


    print "Task Done"

In [None]:
def print_pending_activity_task_count():
    response = client.count_pending_activity_tasks(
        domain=workflowDomain,
        taskList={
            'name': tasklist
        }
    )

    print 'Pending activity task count: {}'.format(response['count'])


In [None]:
def print_pending_descision_task_count():
    response = client.count_pending_decision_tasks(
        domain=workflowDomain,
        taskList={
            'name': tasklist
        }
    )

    print 'Pending decision task count: {}'.format(response['count'])

In [None]:
def get_decision_task(decider_id):
    newTask = client.poll_for_decision_task(
        domain=workflowDomain,
        taskList={'name': tasklist},
        identity=decider_id,
        reverseOrder=False)
    
    if 'taskToken' not in newTask:
        print 'No task available'
    
    return newTask

In [None]:
# Create topic
response = sns.create_topic(
    Name='swf-tutorial-topic'
)
topic_arn = response['TopicArn']
print 'topic arn: {}'.format(topic_arn)

In [None]:
# Arbitrary choice for decider id
decider_id = 'decider-1'

In [None]:
# Is there a task available?
newTask = get_decision_task(decider_id)

In [None]:
# Instantiate the workflow
response = client.start_workflow_execution(
  domain=workflowDomain,
  workflowId='wf-1',
  workflowType={
    "name": 'TutorialWorkflow',
    "version": '1'
  },
  taskList={
      'name': tasklist
  },
  input='initial input'
)

print response

In [None]:
# Is there a task available?
newTask = get_decision_task(decider_id)

print newTask

In [None]:
events = newTask['events']
print events

In [None]:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
print eventHistory

print len(eventHistory)

if eventHistory[0] != 'WorkflowExecutionStarted':
    print 'Expected freshly started workflow'
else:
    print 'Freshly started workflow'

In [None]:
# First decision is to start the first activity
activity_id = 'activityid-' + str(uuid.uuid4())

schedule_activity(newTask, activities[0], activity_id)

In [None]:
print_pending_descision_task_count()
print_pending_activity_task_count()

In [None]:
poll_and_complete_task()

In [None]:
# Is there a task available for the decider?
newTask = get_decision_task(decider_id)
print newTask

In [None]:
task_input = get_input_for_activity(newTask, 'subscribe_topic_activity')
print 'task input ==> {}'.format(task_input)

In [None]:
# Second decision is to start the second activity
activity_id = 'activityid-' + str(uuid.uuid4())

schedule_activity(newTask, activities[1], activity_id)

In [None]:
poll_and_complete_task()

In [None]:
print_pending_descision_task_count()

In [None]:
# Is there a task available for the decider?
newTask = get_decision_task(decider_id)

In [None]:
# Schedule and complete third task
activity_id = 'activityid-' + str(uuid.uuid4())
schedule_activity(newTask, activities[2], activity_id)


In [None]:
print_pending_activity_task_count()
poll_and_complete_task()
print_pending_descision_task_count()

In [None]:
print_pending_descision_task_count()

In [None]:
# Is there a task available for the decider?
newTask = get_decision_task(decider_id)

# Schedule the next activity - again note that we're just deciding to schedule the next activity.
# For a real process we'd probably check the event history and use that context to decide what to
# do next.
activity_id = 'activityid-' + str(uuid.uuid4())
schedule_activity(newTask, activities[3], activity_id)

In [None]:
print_pending_activity_task_count()

In [None]:
poll_and_complete_task()
print_pending_descision_task_count()

In [None]:
newTask = get_decision_task(decider_id)

In [None]:
response = client.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )

print response

In [None]:
print_pending_descision_task_count()
print_pending_activity_task_count()

In [None]:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]

for event in eventHistory:
    print event
    print

In [None]:
## Delete Topic
response = sns.delete_topic(
    TopicArn=topic_arn
)

print response

In [None]:
import json

# convert to string
orig_dict = {}
orig_dict['id'] = 'foo'
print orig_dict['id']
thing = json.dumps({'id': 'foo' })


# load to dict
my_dict = json.loads(thing) 

print my_dict['id']

In [None]:
######## Topic Clean Up