diff --git a/examples/marker.py b/examples/marker.py index 3561e7349..57ab43eb8 100644 --- a/examples/marker.py +++ b/examples/marker.py @@ -1,4 +1,5 @@ from simpleflow import Workflow, futures +from simpleflow.canvas import Chain class MarkerWorkflow(Workflow): @@ -11,5 +12,29 @@ def run(self): m = self.submit(self.record_marker('marker 1', 'some details')) self.submit(self.record_marker('marker 2', "2nd marker's details")) futures.wait(m) - print('Markers: {}'.format(self.list_markers())) - print('Markers, all: {}'.format(self.list_markers(all=True))) + markers = self.list_markers() + assert 2 == len(markers) + print('Markers: {}'.format(markers)) + markers = self.list_markers(all=True) + assert 3 == len(markers) + print('Markers, all: {}'.format(markers)) + + +class MarkerInChainWorkflow(Workflow): + name = 'example' + version = 'example' + task_list = 'example' + + def run(self): + chain = Chain( + self.record_marker('marker 1'), + self.record_marker('marker 1', 'some details'), + self.record_marker('marker 2', "2nd marker's details"), + ) + futures.wait(self.submit(chain)) + markers = self.list_markers() + assert 2 == len(markers) + print('Markers: {}'.format(markers)) + markers = self.list_markers(all=True) + assert 3 == len(markers) + print('Markers, all: {}'.format(markers)) diff --git a/setup.py b/setup.py index 3a6d57a46..79d780ec6 100755 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ def read(fname): 'click', 'psutil>=3.2.1', 'pytz', + 'typing', ] if PY2: DEPS += [ @@ -112,10 +113,12 @@ def read(fname): 'License :: OSI Approved :: MIT License', 'Natural Language :: English', "Programming Language :: Python :: 2", - 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', ], test_suite='tests', tests_require=[ diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index a750652aa..e4ca7ed6e 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -475,13 +475,20 @@ def find_marker_event(self, a_task, history): Get the event corresponding to a activity task, if any. :param a_task: - :type a_task: Marker + :type a_task: MarkerTask :param history: :type history: simpleflow.history.History :return: :rtype: Optional[dict[str, Any]] """ + json_details = a_task.get_json_details() marker_list = history.markers.get(a_task.name) + if not marker_list: + return None + marker_list = filter( + lambda m: m['state'] == 'recorded' and m['details'] == json_details, + marker_list + ) return marker_list[-1] if marker_list else None TASK_TYPE_TO_EVENT_FINDER = { @@ -725,7 +732,7 @@ def _compute_priority(self, priority_set_on_submit, a_task): if priority_set_on_submit is not PRIORITY_NOT_SET: return priority_set_on_submit elif (isinstance(a_task, ActivityTask) and - a_task.activity.task_priority is not PRIORITY_NOT_SET): + a_task.activity.task_priority is not PRIORITY_NOT_SET): return a_task.activity.task_priority elif self._workflow.task_priority is not PRIORITY_NOT_SET: return self._workflow.task_priority diff --git a/simpleflow/swf/task.py b/simpleflow/swf/task.py index aa54e0b68..3389b4ff4 100644 --- a/simpleflow/swf/task.py +++ b/simpleflow/swf/task.py @@ -242,6 +242,9 @@ def schedule(self, *args, **kwargs): decision = swf.models.decision.MarkerDecision() decision.record( self.name, - json_dumps(self.details) if self.details is not None else None, + self.get_json_details(), ) return [decision] + + def get_json_details(self): + return json_dumps(self.details) if self.details is not None else None diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 3ec45f50e..cec49c270 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -2,11 +2,20 @@ import os import boto.swf +from click.testing import CliRunner +from sure import expect from vcr import VCR import simpleflow.command # NOQA from tests.utils import IntegrationTestCase +from simpleflow.utils import json_dumps + +if False: + from typing import List, Union + from click.testing import Result + + # Default SWF parameters os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["SWF_DOMAIN"] = "TestDomain" @@ -50,3 +59,57 @@ def conn(self): if not hasattr(self, "_conn"): self._conn = boto.swf.connect_to_region(self.region) return self._conn + + def get_events(self, run_id): + response = self.conn.get_workflow_execution_history( + self.domain, + run_id, + self.workflow_id, + ) + events = response['events'] + next_page = response.get('nextPageToken') + while next_page is not None: + response = self.conn.get_workflow_execution_history( + self.domain, + run_id, + self.workflow_id, + next_page_token=next_page, + ) + + events.extend(response['events']) + next_page = response.get('nextPageToken') + return events + + def invoke(self, command, arguments): + # type: (str, Union(str, List[str])) -> Result + if not hasattr(self, "runner"): + self.runner = CliRunner() + if isinstance(arguments, str): + arguments = arguments.split(" ") + print('simpleflow {} {}'.format(command, ' '.join(arguments))) + return self.runner.invoke(command, arguments, catch_exceptions=False) + + def run_standalone(self, workflow_name, *args, **kwargs): + input = json_dumps(dict(args=args, kwargs=kwargs)) + result = self.invoke( + simpleflow.command.cli, + [ + "standalone", + "--workflow-id", + str(self.workflow_id), + "--input", + input, + "--nb-deciders", + "2", + "--nb-workers", + "2", + workflow_name, + ], + ) + expect(result.exit_code).to.equal(0) + lines = result.output.split("\n") + start_line = [line for line in lines if line.startswith(self.workflow_id)][0] + _, run_id = start_line.split(" ", 1) + + events = self.get_events(run_id) + return events diff --git a/tests/integration/cassettes/test_with_replays.yaml b/tests/integration/cassettes/test_with_replays.yaml new file mode 100644 index 000000000..2afa95ab9 --- /dev/null +++ b/tests/integration/cassettes/test_with_replays.yaml @@ -0,0 +1,90 @@ +interactions: +- request: + body: !!python/unicode '{"domain": "TestDomain", "workflowType": {"version": "example", + "name": "example"}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['83'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103000Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.487851963534E9,"status":"REGISTERED","workflowType":{"name":"example","version":"example"}}}'} + headers: + content-length: ['290'] + content-type: [application/json] + x-amzn-requestid: [5b64c228-fffc-11e6-bc39-2f26f01a61e7] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "taskList": {"name": "test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}, + "childPolicy": "TERMINATE", "input": "{\"args\":[true],\"kwargs\":{}}", "workflowType": + {"version": "example", "name": "example"}, "workflowId": "test-simpleflow-workflow"}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['280'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103001Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}'} + headers: + content-length: ['58'] + content-type: [application/json] + x-amzn-requestid: [5b9ebdef-fffc-11e6-a472-65992fd150a3] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow", + "runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103003Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"COMPLETED","closeTimestamp":1.488537003712E9,"execution":{"runId":"22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.488537001561E9,"workflowType":{"name":"example","version":"example"}},"openCounts":{"openActivityTasks":0,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'} + headers: + content-length: ['661'] + content-type: [application/json] + x-amzn-requestid: [5d13f8a2-fffc-11e6-8d43-f7a87b88066c] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow", + "runId": "22whTBuUw1fjwfLHU4XuHBL+uPmpOpU2900LflTOTTVg0="}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103104Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"events":[{"eventId":1,"eventTimestamp":1.488537001561E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[true],\"kwargs\":{}}","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"},"taskStartToCloseTimeout":"300","workflowType":{"name":"example","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":2,"eventTimestamp":1.488537001561E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.488537001621E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.488537002044E9,"eventType":"DecisionTaskCompleted"},{"eventId":5,"eventTimestamp":1.488537002044E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"markerName":"marker + 1"}},{"eventId":6,"eventTimestamp":1.488537002044E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":4,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":7,"eventTimestamp":1.488537002167E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":6,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":8,"eventTimestamp":1.488537002167E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":8},"eventId":9,"eventTimestamp":1.48853700224E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":8,"startedEventId":9},"eventId":10,"eventTimestamp":1.488537002693E9,"eventType":"DecisionTaskCompleted"},{"eventId":11,"eventTimestamp":1.488537002693E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":10,"details":"\"some + details\"","markerName":"marker 1"}},{"eventId":12,"eventTimestamp":1.488537002693E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":10,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":13,"eventTimestamp":1.48853700271E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":12,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":14,"eventTimestamp":1.48853700271E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7418,\"user\":\"zeb\"}","scheduledEventId":14},"eventId":15,"eventTimestamp":1.488537002771E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":14,"startedEventId":15},"eventId":16,"eventTimestamp":1.488537003206E9,"eventType":"DecisionTaskCompleted"},{"eventId":17,"eventTimestamp":1.488537003206E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":16,"details":"\"2nd + marker''s details\"","markerName":"marker 2"}},{"eventId":18,"eventTimestamp":1.488537003206E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":16,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":19,"eventTimestamp":1.488537003233E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":18,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-b3bc4bd5de8d48cf8908c8b00a834e79"}},"eventId":20,"eventTimestamp":1.488537003233E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7416,\"user\":\"zeb\"}","scheduledEventId":20},"eventId":21,"eventTimestamp":1.488537003291E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":20,"startedEventId":21},"eventId":22,"eventTimestamp":1.488537003712E9,"eventType":"DecisionTaskCompleted"},{"eventId":23,"eventTimestamp":1.488537003712E9,"eventType":"WorkflowExecutionCompleted","workflowExecutionCompletedEventAttributes":{"decisionTaskCompletedEventId":22,"result":"null"}}]}'} + headers: + content-length: ['4796'] + content-type: [application/json] + x-amzn-requestid: [816a56b1-fffc-11e6-ad40-e15570700f4f] + status: {code: 200, message: OK} +version: 1 diff --git a/tests/integration/cassettes/test_without_replays.yaml b/tests/integration/cassettes/test_without_replays.yaml new file mode 100644 index 000000000..4533594b8 --- /dev/null +++ b/tests/integration/cassettes/test_without_replays.yaml @@ -0,0 +1,90 @@ +interactions: +- request: + body: !!python/unicode '{"domain": "TestDomain", "workflowType": {"version": "example", + "name": "example"}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['83'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103105Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.487851963534E9,"status":"REGISTERED","workflowType":{"name":"example","version":"example"}}}'} + headers: + content-length: ['290'] + content-type: [application/json] + x-amzn-requestid: [81cbb03e-fffc-11e6-b185-d14df3ef102b] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "taskList": {"name": "test-simpleflow-workflow-7ac796d690e44e95b3e5255093f5dd9e"}, + "childPolicy": "TERMINATE", "input": "{\"args\":[false],\"kwargs\":{}}", "workflowType": + {"version": "example", "name": "example"}, "workflowId": "test-simpleflow-workflow"}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['281'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103105Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"runId":"22+pRaR6QEN8eXvOv5TdgnTHjXNwgmKLmnSL60QB0F2gc="}'} + headers: + content-length: ['58'] + content-type: [application/json] + x-amzn-requestid: [82078139-fffc-11e6-89c8-a95ed3fb3064] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow", + "runId": "22+pRaR6QEN8eXvOv5TdgnTHjXNwgmKLmnSL60QB0F2gc="}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103108Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-7ac796d690e44e95b3e5255093f5dd9e"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"COMPLETED","closeTimestamp":1.488537067133E9,"execution":{"runId":"22+pRaR6QEN8eXvOv5TdgnTHjXNwgmKLmnSL60QB0F2gc=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.488537066006E9,"workflowType":{"name":"example","version":"example"}},"openCounts":{"openActivityTasks":0,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'} + headers: + content-length: ['661'] + content-type: [application/json] + x-amzn-requestid: [837dcd8f-fffc-11e6-b96c-a3a53c1bebac] + status: {code: 200, message: OK} +- request: + body: !!python/unicode '{"domain": "TestDomain", "execution": {"workflowId": "test-simpleflow-workflow", + "runId": "22+pRaR6QEN8eXvOv5TdgnTHjXNwgmKLmnSL60QB0F2gc="}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.46.1 Python/2.7.12+ Linux/4.8.0-38-generic] + X-Amz-Date: [20170303T103207Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: !!python/unicode '{"events":[{"eventId":1,"eventTimestamp":1.488537066006E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[false],\"kwargs\":{}}","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-7ac796d690e44e95b3e5255093f5dd9e"},"taskStartToCloseTimeout":"300","workflowType":{"name":"example","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-7ac796d690e44e95b3e5255093f5dd9e"}},"eventId":2,"eventTimestamp":1.488537066006E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7432,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.48853706613E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.488537066545E9,"eventType":"DecisionTaskCompleted"},{"eventId":5,"eventTimestamp":1.488537066545E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"markerName":"marker + 1"}},{"eventId":6,"eventTimestamp":1.488537066545E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"details":"\"some + details\"","markerName":"marker 1"}},{"eventId":7,"eventTimestamp":1.488537066545E9,"eventType":"MarkerRecorded","markerRecordedEventAttributes":{"decisionTaskCompletedEventId":4,"details":"\"2nd + marker''s details\"","markerName":"marker 2"}},{"eventId":8,"eventTimestamp":1.488537066545E9,"eventType":"TimerStarted","timerStartedEventAttributes":{"decisionTaskCompletedEventId":4,"startToFireTimeout":"0","timerId":"_simpleflow_wake_up_timer"}},{"eventId":9,"eventTimestamp":1.488537066568E9,"eventType":"TimerFired","timerFiredEventAttributes":{"startedEventId":8,"timerId":"_simpleflow_wake_up_timer"}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-7ac796d690e44e95b3e5255093f5dd9e"}},"eventId":10,"eventTimestamp":1.488537066568E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-Precision-T3610\",\"pid\":7431,\"user\":\"zeb\"}","scheduledEventId":10},"eventId":11,"eventTimestamp":1.488537066699E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":10,"startedEventId":11},"eventId":12,"eventTimestamp":1.488537067133E9,"eventType":"DecisionTaskCompleted"},{"eventId":13,"eventTimestamp":1.488537067133E9,"eventType":"WorkflowExecutionCompleted","workflowExecutionCompletedEventAttributes":{"decisionTaskCompletedEventId":12,"result":"null"}}]}'} + headers: + content-length: ['2810'] + content-type: [application/json] + x-amzn-requestid: [a70a250a-fffc-11e6-829c-39b40098ba3b] + status: {code: 200, message: OK} +version: 1 diff --git a/tests/integration/test_commands.py b/tests/integration/test_commands.py index 6ae4482c2..a05c74c45 100644 --- a/tests/integration/test_commands.py +++ b/tests/integration/test_commands.py @@ -5,7 +5,6 @@ from sure import expect import simpleflow.command - from . import vcr, VCRIntegrationTest @@ -115,24 +114,7 @@ def test_simpleflow_idempotent(self): start_line = [line for line in lines if line.startswith(self.workflow_id)][0] _, run_id = start_line.split(" ", 1) - response = self.conn.get_workflow_execution_history( - self.domain, - run_id, - self.workflow_id, - ) - - events = response['events'] - next_page = response.get('nextPageToken') - while next_page is not None: - response = self.conn.get_workflow_execution_history( - self.domain, - run_id, - self.workflow_id, - next_page_token=next_page, - ) - - events.extend(response['events']) - next_page = response.get('nextPageToken') + events = self.get_events(run_id) activities = [ e['activityTaskScheduledEventAttributes']['activityId'] diff --git a/tests/integration/test_markers.py b/tests/integration/test_markers.py new file mode 100644 index 000000000..56bcd3f9a --- /dev/null +++ b/tests/integration/test_markers.py @@ -0,0 +1,39 @@ +from sure import expect + +from tests.integration import VCRIntegrationTest, vcr + + +class TestMarkers(VCRIntegrationTest): + @vcr.use_cassette + def test_without_replays(self): + events = self.run_standalone('tests.integration.workflow.MarkerWorkflow', False) + marker_recorded = list(filter( + lambda e: e['eventType'] == 'MarkerRecorded', + events + )) + expect(len(marker_recorded)).to.equal(3) # 3 markers + marker_details = [ + e['markerRecordedEventAttributes'].get('details') for e in marker_recorded + ] + expect(marker_details).to.equal([None, '"some details"', '"2nd marker\'s details"']) + decision_task_completed_event_id = set([ + e['markerRecordedEventAttributes']['decisionTaskCompletedEventId'] for e in marker_recorded + ]) + expect(len(decision_task_completed_event_id)).to.equal(1) # sent in 1 decision + + @vcr.use_cassette + def test_with_replays(self): + events = self.run_standalone('tests.integration.workflow.MarkerWorkflow', True) + marker_recorded = list(filter( + lambda e: e['eventType'] == 'MarkerRecorded', + events + )) + expect(len((marker_recorded))).to.equal(3) # 3 markers + marker_details = [ + e['markerRecordedEventAttributes'].get('details') for e in marker_recorded + ] + expect(marker_details).to.equal([None, '"some details"', '"2nd marker\'s details"']) + decision_task_completed_event_id = set([ + e['markerRecordedEventAttributes']['decisionTaskCompletedEventId'] for e in marker_recorded + ]) + expect(len(decision_task_completed_event_id)).to.equal(3) # sent in different decisions diff --git a/tests/integration/workflow.py b/tests/integration/workflow.py index c9e92273f..66bbad2cf 100644 --- a/tests/integration/workflow.py +++ b/tests/integration/workflow.py @@ -7,6 +7,7 @@ Workflow, futures, ) +from simpleflow.canvas import Chain @activity.with_attributes(task_list='quickstart', version='example', @@ -49,6 +50,30 @@ def run(self): assert results[0].result != results[-1].result +class MarkerWorkflow(Workflow): + name = 'example' + version = 'example' + task_list = 'example' + decision_tasks_timeout = '300' + execution_timeout = '3600' + + def run(self, use_chain): + m1 = (self.record_marker('marker 1')) + m2 = (self.record_marker('marker 1', 'some details')) + m3 = self.record_marker('marker 2', "2nd marker's details") + if use_chain: + # Markers will be submitted in 3 replays + future = self.submit(Chain( + m1, m2, m3 + )) + else: + # Markers will be submitted as one decision + future = self.submit(m1) + self.submit(m2) + self.submit(m3) + futures.wait(future) + + @activity.with_attributes(task_list='quickstart', version='example', idempotent=True) def get_uuid(unused=None): return str(uuid.uuid4())