Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions examples/marker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from simpleflow import Workflow, futures
from simpleflow.canvas import Chain


class MarkerWorkflow(Workflow):
Expand All @@ -11,5 +12,29 @@ def run(self):
m = self.submit(self.record_marker('marker 1', 'some details'))
self.submit(self.record_marker('marker 2', "2nd marker's details"))
futures.wait(m)
print('Markers: {}'.format(self.list_markers()))
print('Markers, all: {}'.format(self.list_markers(all=True)))
markers = self.list_markers()
assert 2 == len(markers)
print('Markers: {}'.format(markers))
markers = self.list_markers(all=True)
assert 3 == len(markers)
print('Markers, all: {}'.format(markers))


class MarkerInChainWorkflow(Workflow):
name = 'example'
version = 'example'
task_list = 'example'

def run(self):
chain = Chain(
self.record_marker('marker 1'),
self.record_marker('marker 1', 'some details'),
self.record_marker('marker 2', "2nd marker's details"),
)
futures.wait(self.submit(chain))
markers = self.list_markers()
assert 2 == len(markers)
print('Markers: {}'.format(markers))
markers = self.list_markers(all=True)
assert 3 == len(markers)
print('Markers, all: {}'.format(markers))
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def read(fname):
'click',
'psutil>=3.2.1',
'pytz',
'typing',
]
if PY2:
DEPS += [
Expand Down Expand Up @@ -112,10 +113,12 @@ def read(fname):
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
"Programming Language :: Python :: 2",
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
test_suite='tests',
tests_require=[
Expand Down
11 changes: 9 additions & 2 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,20 @@ def find_marker_event(self, a_task, history):
Get the event corresponding to a activity task, if any.

:param a_task:
:type a_task: Marker
:type a_task: MarkerTask
:param history:
:type history: simpleflow.history.History
:return:
:rtype: Optional[dict[str, Any]]
"""
json_details = a_task.get_json_details()
marker_list = history.markers.get(a_task.name)
if not marker_list:
return None
marker_list = filter(
lambda m: m['state'] == 'recorded' and m['details'] == json_details,
marker_list
)
return marker_list[-1] if marker_list else None

TASK_TYPE_TO_EVENT_FINDER = {
Expand Down Expand Up @@ -725,7 +732,7 @@ def _compute_priority(self, priority_set_on_submit, a_task):
if priority_set_on_submit is not PRIORITY_NOT_SET:
return priority_set_on_submit
elif (isinstance(a_task, ActivityTask) and
a_task.activity.task_priority is not PRIORITY_NOT_SET):
a_task.activity.task_priority is not PRIORITY_NOT_SET):
return a_task.activity.task_priority
elif self._workflow.task_priority is not PRIORITY_NOT_SET:
return self._workflow.task_priority
Expand Down
5 changes: 4 additions & 1 deletion simpleflow/swf/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ def schedule(self, *args, **kwargs):
decision = swf.models.decision.MarkerDecision()
decision.record(
self.name,
json_dumps(self.details) if self.details is not None else None,
self.get_json_details(),
)
return [decision]

def get_json_details(self):
return json_dumps(self.details) if self.details is not None else None
63 changes: 63 additions & 0 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
import os

import boto.swf
from click.testing import CliRunner
from sure import expect
from vcr import VCR

import simpleflow.command # NOQA
from tests.utils import IntegrationTestCase

from simpleflow.utils import json_dumps

if False:
from typing import List, Union
from click.testing import Result


# Default SWF parameters
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
os.environ["SWF_DOMAIN"] = "TestDomain"
Expand Down Expand Up @@ -50,3 +59,57 @@ def conn(self):
if not hasattr(self, "_conn"):
self._conn = boto.swf.connect_to_region(self.region)
return self._conn

def get_events(self, run_id):
response = self.conn.get_workflow_execution_history(
self.domain,
run_id,
self.workflow_id,
)
events = response['events']
next_page = response.get('nextPageToken')
while next_page is not None:
response = self.conn.get_workflow_execution_history(
self.domain,
run_id,
self.workflow_id,
next_page_token=next_page,
)

events.extend(response['events'])
next_page = response.get('nextPageToken')
return events

def invoke(self, command, arguments):
# type: (str, Union(str, List[str])) -> Result
if not hasattr(self, "runner"):
self.runner = CliRunner()
if isinstance(arguments, str):
arguments = arguments.split(" ")
print('simpleflow {} {}'.format(command, ' '.join(arguments)))
return self.runner.invoke(command, arguments, catch_exceptions=False)

def run_standalone(self, workflow_name, *args, **kwargs):
input = json_dumps(dict(args=args, kwargs=kwargs))
result = self.invoke(
simpleflow.command.cli,
[
"standalone",
"--workflow-id",
str(self.workflow_id),
"--input",
input,
"--nb-deciders",
"2",
"--nb-workers",
"2",
workflow_name,
],
)
expect(result.exit_code).to.equal(0)
lines = result.output.split("\n")
start_line = [line for line in lines if line.startswith(self.workflow_id)][0]
_, run_id = start_line.split(" ", 1)

events = self.get_events(run_id)
return events
90 changes: 90 additions & 0 deletions tests/integration/cassettes/test_with_replays.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
interactions:
- request:
body: !!python/unicode '{"domain": "TestDomain", "workflowType": {"version": "example",
"name": "example"}}'
headers:
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
Content-Encoding: [amz-1.0]
Content-Length: ['83']
Content-Type: [application/json; charset=UTF-8]
Host: [swf.us-east-1.amazonaws.com]
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
X-Amz-Date: [20170303T103000Z]
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType]
method: POST
uri: https://swf.us-east-1.amazonaws.com/
response:
body: {string: !!python/unicode '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.487851963534E9,"status":"REGISTERED","workflowType":{"name":"example","version":"example"}}}'}
headers:
content-length: ['290']
content-type: [application/json]
x-amzn-requestid: [5b64c228-fffc-11e6-bc39-2f26f01a61e7]
status: {code: 200, message: OK}
- request:
body: !!python/unicode '{"domain": "TestDomain", "taskList": {"name": "test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},
"childPolicy": "TERMINATE", "input": "{\"args\":[true],\"kwargs\":{}}", "workflowType":
{"version": "example", "name": "example"}, "workflowId": "test-simpleflow-workflow"}'
headers:
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
Content-Encoding: [amz-1.0]
Content-Length: ['280']
Content-Type: [application/json; charset=UTF-8]
Host: [swf.us-east-1.amazonaws.com]
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
X-Amz-Date: [20170303T103001Z]
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution]
method: POST
uri: https://swf.us-east-1.amazonaws.com/
response:
body: {string: !!python/unicode '{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}'}
headers:
content-length: ['58']
content-type: [application/json]
x-amzn-requestid: [5b9ebdef-fffc-11e6-a472-65992fd150a3]
status: {code: 200, message: OK}
- request:
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
"runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}'
headers:
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
Content-Encoding: [amz-1.0]
Content-Length: ['140']
Content-Type: [application/json; charset=UTF-8]
Host: [swf.us-east-1.amazonaws.com]
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
X-Amz-Date: [20170303T103003Z]
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution]
method: POST
uri: https://swf.us-east-1.amazonaws.com/
response:
body: {string: !!python/unicode '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"COMPLETED","closeTimestamp":1.488537003712E9,"execution":{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.488537001561E9,"workflowType":{"name":"example","version":"example"}},"openCounts":{"openActivityTasks":0,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'}
headers:
content-length: ['661']
content-type: [application/json]
x-amzn-requestid: [5d13f8a2-fffc-11e6-8d43-f7a87b88066c]
status: {code: 200, message: OK}
- request:
body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow",
"runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}'
headers:
Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar']
Content-Encoding: [amz-1.0]
Content-Length: ['140']
Content-Type: [application/json; charset=UTF-8]
Host: [swf.us-east-1.amazonaws.com]
User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic]
X-Amz-Date: [20170303T103104Z]
X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory]
method: POST
uri: https://swf.us-east-1.amazonaws.com/
response:
body: {string: !!python/unicode '{"events":[{"eventId":1,"eventTimestamp":1.488537001561E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[true],\"kwargs\":{}}","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300","workflowType":{"name":"example","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":2,"eventTimestamp":1.488537001561E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.488537001621E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.488537002044E9,"eventType":"DecisionTaskCompleted"},{"eventId":5,"eventTimestamp":1.488537002044E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"markerName":"marker
1"}},{"eventId":6,"eventTimestamp":1.488537002044E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":4,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":7,"eventTimestamp":1.488537002167E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":6,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":8,"eventTimestamp":1.488537002167E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":8},"eventId":9,"eventTimestamp":1.48853700224E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":8,"startedEventId":9},"eventId":10,"eventTimestamp":1.488537002693E9,"eventType":"DecisionTaskCompleted"},{"eventId":11,"eventTimestamp":1.488537002693E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":10,"details":"\"some
details\"","markerName":"marker 1"}},{"eventId":12,"eventTimestamp":1.488537002693E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":10,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":13,"eventTimestamp":1.48853700271E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":12,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":14,"eventTimestamp":1.48853700271E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":14},"eventId":15,"eventTimestamp":1.488537002771E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":14,"startedEventId":15},"eventId":16,"eventTimestamp":1.488537003206E9,"eventType":"DecisionTaskCompleted"},{"eventId":17,"eventTimestamp":1.488537003206E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":16,"details":"\"2nd
marker''s details\"","markerName":"marker 2"}},{"eventId":18,"eventTimestamp":1.488537003206E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":16,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":19,"eventTimestamp":1.488537003233E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":18,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":20,"eventTimestamp":1.488537003233E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":20},"eventId":21,"eventTimestamp":1.488537003291E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":20,"startedEventId":21},"eventId":22,"eventTimestamp":1.488537003712E9,"eventType":"DecisionTaskCompleted"},{"eventId":23,"eventTimestamp":1.488537003712E9,"eventType":"WorkflowExecutionCompleted","workflowExecutionCompletedEventAttributes":{"decisionTaskCompletedEventId":22,"result":"null"}}]}'}
headers:
content-length: ['4796']
content-type: [application/json]
x-amzn-requestid: [816a56b1-fffc-11e6-ad40-e15570700f4f]
status: {code: 200, message: OK}
version: 1
Loading