From 6d5a17fcd172076240ec1adc80271399d484e62c Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 17:20:06 +0800 Subject: [PATCH 01/50] Unit test for DecisionEvents.__post_init (markers). --- cadence/tests/test_decision_events.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cadence/tests/test_decision_events.py b/cadence/tests/test_decision_events.py index 81f888b..8ce36c5 100644 --- a/cadence/tests/test_decision_events.py +++ b/cadence/tests/test_decision_events.py @@ -1,5 +1,6 @@ import pytest +from cadence.cadence_types import HistoryEvent, EventType from cadence.decision_loop import DecisionEvents @@ -31,3 +32,16 @@ def test_get_optional_event_negative(decision_events): def test_get_optional_event_too_large(decision_events): e = decision_events.get_optional_decision_event(25) assert e is None + + +def test_markers(): + marker = HistoryEvent(event_type=EventType.MarkerRecorded) + events = [ + HistoryEvent(event_type=EventType.WorkflowExecutionStarted), + HistoryEvent(event_type=EventType.ActivityTaskScheduled), + HistoryEvent(event_type=EventType.ActivityTaskCanceled), + marker + ] + decision_events = DecisionEvents(events=[], decision_events=events, replay=True, replay_current_time_milliseconds=0, next_decision_event_id=10) + assert len(decision_events.markers) == 1 + assert id(decision_events.markers[0]) == id(marker) From 97654af40cb70ebcb5ab2de5bd391de8a326c45b Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 17:33:34 +0800 Subject: [PATCH 02/50] Unit test for MarkerHandler set_data() and marker_replayed(). --- cadence/tests/test_marker.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/cadence/tests/test_marker.py b/cadence/tests/test_marker.py index ee17fc8..68bb0bc 100644 --- a/cadence/tests/test_marker.py +++ b/cadence/tests/test_marker.py @@ -5,6 +5,7 @@ from cadence.cadence_types import WorkflowType, DecisionType, MarkerRecordedEventAttributes, Header, HistoryEvent, \ EventType +from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import ReplayDecider, DecisionContext, DecisionEvents from cadence.decisions import DecisionId, DecisionTarget from cadence.marker import MarkerData, MUTABLE_MARKER_HEADER_KEY, MarkerHandler, MarkerInterface, PlainMarkerData, \ @@ -189,3 +190,17 @@ def callback(stored): ret = handler.handle("the-id", callback) assert ret == b'123' assert len(decision_context.decider.decisions) == 0 + + +def test_marker_handler_set_data(): + handler = MarkerHandler(decision_context=Mock(), marker_name=VERSION_MARKER_NAME) + handler.set_data("abc", b"stuff") + assert "abc" in handler.mutable_marker_results + assert handler.mutable_marker_results["abc"].data == b"stuff" + + +def test_marker_handler_mark_replayed(): + handler = MarkerHandler(decision_context=Mock(), marker_name=VERSION_MARKER_NAME) + handler.set_data("abc", b"stuff") + handler.mark_replayed("abc") + assert handler.mutable_marker_results["abc"].replayed From 5ce94c2805c6f362c10b3f16c610b8837c61525d Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 17:58:41 +0800 Subject: [PATCH 03/50] Unit test: markers should be processed first in ReplayDecider.process_decision_events() --- cadence/tests/test_decision_loop.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 56d4a24..5d713a7 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -6,7 +6,8 @@ from cadence.cadence_types import HistoryEvent, EventType, PollForDecisionTaskResponse, \ ScheduleActivityTaskDecisionAttributes, WorkflowExecutionStartedEventAttributes, Decision, \ - ActivityTaskStartedEventAttributes + ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes +from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import HistoryHelper, is_decision_event, DecisionTaskLoop, ReplayDecider, DecisionEvents, \ nano_to_milli from cadence.decisions import DecisionId, DecisionTarget @@ -312,6 +313,27 @@ def test_process_decision_events_notifies_when_replay(self): self.decider.process_decision_events(decision_events) self.decider.notify_decision_sent.assert_called_once() + def test_process_decision_events_markers_first(self): + self.decider.event_loop = Mock() + marker_event = HistoryEvent(event_type=EventType.MarkerRecorded) + marker_event.marker_recorded_event_attributes = MarkerRecordedEventAttributes() + marker_event.marker_recorded_event_attributes.marker_name = VERSION_MARKER_NAME + events = [ + HistoryEvent(event_type=EventType.WorkflowExecutionStarted, + workflow_execution_started_event_attributes=WorkflowExecutionStartedEventAttributes()), + HistoryEvent(event_type=EventType.DecisionTaskScheduled), + marker_event + ] + decision_events = DecisionEvents([], events, replay=True, + replay_current_time_milliseconds=0, + next_decision_event_id=5) + self.decider.process_event = Mock() + self.decider.process_decision_events(decision_events) + self.decider.process_event.assert_called() + assert len(self.decider.process_event.call_args_list ) == 4 + args, kwargs = self.decider.process_event.call_args_list[0] + assert id(args[0]) == id(marker_event) + def test_activity_task_closed(self): state_machine: DecisionStateMachine = Mock() state_machine.is_done = MagicMock(return_value=True) From 3220585e49b5dab26d545a26e786554274ef8c77 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 18:02:02 +0800 Subject: [PATCH 04/50] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 9ce6522..098fab0 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ Mar Post 2.0: - [ ] sideEffect/mutableSideEffect +- [ ] Local activity - [ ] Parallel activity execution - [ ] Timers - [ ] Cancellation Scopes From 7d9147c43b5403f7a1332a89ec48fd89d48f103b Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 18:47:17 +0800 Subject: [PATCH 05/50] Unit test: ClockDecisionContext.handle_marker_recorded() (version_handler). --- cadence/tests/test_clock_decision_context.py | 23 +++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/cadence/tests/test_clock_decision_context.py b/cadence/tests/test_clock_decision_context.py index dda19c9..2ebe24c 100644 --- a/cadence/tests/test_clock_decision_context.py +++ b/cadence/tests/test_clock_decision_context.py @@ -1,12 +1,14 @@ +import json from typing import Callable from unittest.mock import MagicMock, Mock import pytest from cadence.cadence_types import StartTimerDecisionAttributes, TimerFiredEventAttributes, HistoryEvent, \ - TimerCanceledEventAttributes -from cadence.clock_decision_context import ClockDecisionContext, TimerCancellationHandler + TimerCanceledEventAttributes, EventType, MarkerRecordedEventAttributes, Header +from cadence.clock_decision_context import ClockDecisionContext, TimerCancellationHandler, VERSION_MARKER_NAME from cadence.exceptions import CancellationException +from cadence.marker import MUTABLE_MARKER_HEADER_KEY from cadence.util import OpenRequestInfo TIMER_ID = 20 @@ -26,7 +28,7 @@ def decider(): @pytest.fixture def clock_decision_context(decider): - context = ClockDecisionContext(decider=decider) + context = ClockDecisionContext(decider=decider, decision_context=MagicMock()) context.set_replay_current_time_milliseconds(REPLAY_CURRENT_TIME_MS) return context @@ -146,3 +148,18 @@ def test_handle_timer_canceled(clock_decision_context, decider, request_info): assert args[0] is None assert isinstance(args[1], Exception) + +def test_handle_marker_recorded_version(clock_decision_context): + event = HistoryEvent(event_type=EventType.MarkerRecorded) + event.marker_recorded_event_attributes = MarkerRecordedEventAttributes() + event.marker_recorded_event_attributes.details = "Blahh" + event.marker_recorded_event_attributes.marker_name = VERSION_MARKER_NAME + event.marker_recorded_event_attributes.header = Header() + event.marker_recorded_event_attributes.header.fields[MUTABLE_MARKER_HEADER_KEY] = bytes(json.dumps({ + "id": "the-id", + "eventId": 20, + "accessCount": 0 + }), "utf-8") + clock_decision_context.handle_marker_recorded(event) + assert "the-id" in clock_decision_context.version_handler.mutable_marker_results + assert clock_decision_context.version_handler.mutable_marker_results["the-id"].data == "Blahh" From 3c325674789c22f344169347a2d52b8eb17ec2b4 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 18:55:35 +0800 Subject: [PATCH 06/50] Handle WorkflowExecutionCompleted. --- cadence/decision_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 351496f..aa5bb8a 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -811,6 +811,7 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent): event_handlers = { EventType.WorkflowExecutionStarted: ReplayDecider.handle_workflow_execution_started, EventType.WorkflowExecutionCancelRequested: ReplayDecider.handle_workflow_execution_cancel_requested, + EventType.WorkflowExecutionCompleted: noop, EventType.DecisionTaskScheduled: noop, EventType.DecisionTaskStarted: noop, # Filtered by HistoryHelper EventType.DecisionTaskTimedOut: noop, # TODO: check From 2a30c6ac86423baab2b2f2a5a3e31897e6b2dfac Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 21 Mar 2020 18:56:15 +0800 Subject: [PATCH 07/50] Unit test: query workflow after it is completed. --- cadence/tests/test_query_workflow.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cadence/tests/test_query_workflow.py b/cadence/tests/test_query_workflow.py index 180989f..944568b 100644 --- a/cadence/tests/test_query_workflow.py +++ b/cadence/tests/test_query_workflow.py @@ -60,7 +60,7 @@ def test_query_workflow(): client = WorkflowClient.new_client(domain=DOMAIN) workflow: TestQueryWorkflow = client.new_workflow_stub(TestQueryWorkflow) - WorkflowClient.start(workflow.get_greetings) + workflow_ec = WorkflowClient.start(workflow.get_greetings) assert workflow.get_message() == "initial-message" workflow.put_message("second-message") @@ -73,5 +73,9 @@ def test_query_workflow(): workflow.put_message("done") + client.wait_for_close(workflow_ec) + + assert workflow.get_message() == "done" + print("Stopping workers") worker.stop() From dbac65158fdbf0262fd895118c6858d13ea55448 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 01:10:10 +0800 Subject: [PATCH 08/50] Update to include additional hidden field in stub instance "_retry_parameters". --- cadence/tests/test_activity_method.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cadence/tests/test_activity_method.py b/cadence/tests/test_activity_method.py index af0a8b7..df10790 100644 --- a/cadence/tests/test_activity_method.py +++ b/cadence/tests/test_activity_method.py @@ -80,6 +80,7 @@ def hello(self): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello() @@ -100,6 +101,7 @@ def hello(self, arg1): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello(1) @@ -120,6 +122,7 @@ def hello(self, arg1, arg2): stub = HelloActivities() stub._decision_context = self.decision_context + stub._retry_parameters = None async def fn(): await stub.hello(1, "one") From bede56e5d62a7c179d8a103e194cb0f8fd6350fc Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 01:12:53 +0800 Subject: [PATCH 09/50] Use a custom exception type because deserialize_exception has issues with built-in Exception types. --- cadence/exception_handling.py | 3 +++ cadence/tests/test_decision_context.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/cadence/exception_handling.py b/cadence/exception_handling.py index 9d53b0a..b4f5eb4 100644 --- a/cadence/exception_handling.py +++ b/cadence/exception_handling.py @@ -49,6 +49,9 @@ def serialize_exception(ex: Exception): def deserialize_exception(details) -> Exception: + """ + TODO: Support built-in types like Exception + """ exception: Exception = None details_dict = json.loads(details) source = details_dict.get("source") diff --git a/cadence/tests/test_decision_context.py b/cadence/tests/test_decision_context.py index 7a3d6b0..292f067 100644 --- a/cadence/tests/test_decision_context.py +++ b/cadence/tests/test_decision_context.py @@ -11,7 +11,7 @@ TimeoutType from cadence.decision_loop import DecisionContext, ReplayDecider from cadence.exceptions import NonDeterministicWorkflowException, ActivityTaskFailedException, \ - ActivityTaskTimeoutException + ActivityTaskTimeoutException, ActivityFailureException def run_once(loop): @@ -87,18 +87,23 @@ def test_raise_exception(self): self.assertFalse(self.task.done()) future = self.context.scheduled_activities[20] - exception = Exception("thrown by activity") + exception = DummyUserLevelException("thrown by activity") future.set_exception(exception) run_once(self.event_loop) self.assertTrue(self.task.done()) raised_exception = self.task.exception() - self.assertEqual(exception, raised_exception) + self.assertIsInstance(raised_exception, ActivityFailureException) + self.assertEqual(repr(exception), repr(raised_exception.get_cause())) def tearDown(self) -> None: self.task.cancel() +class DummyUserLevelException(Exception): + pass + + class TestHandleActivityTaskEvents(TestCase): def setUp(self) -> None: self.decider: ReplayDecider = Mock() From cbe9e9d5026b9e4d1ee57a5220108655f6aee413 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 01:28:30 +0800 Subject: [PATCH 10/50] Dummy event object should be of type HistoryEvent. --- cadence/tests/test_decision_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/tests/test_decision_events.py b/cadence/tests/test_decision_events.py index 8ce36c5..652dba9 100644 --- a/cadence/tests/test_decision_events.py +++ b/cadence/tests/test_decision_events.py @@ -6,7 +6,7 @@ @pytest.fixture() def event_object(): - return object() + return HistoryEvent() @pytest.fixture() From 495d07b510af8f4182db94d7cd95bcc045e3520f Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 01:34:57 +0800 Subject: [PATCH 11/50] Fix unit test for test_first_decision_next_decision_id. --- cadence/tests/test_decision_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 5d713a7..10d8c71 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -237,6 +237,7 @@ def setUp(self) -> None: worker.get_workflow_method = MagicMock(return_value=(DummyWorkflow, lambda *args: None)) self.decider = ReplayDecider(execution_id="", workflow_type=Mock(), worker=worker) self.decider.event_loop = Mock() + self.decider.process_event = Mock() def test_first_decision_next_decision_id(self): self.decider.process_decision_events(self.decision_events) From 10aabfbd99e15cea803d4e210590bcf6f73ab04e Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 01:52:56 +0800 Subject: [PATCH 12/50] Update test_activity_task_failed - ActivityTaskFailedException is no longer used. --- cadence/tests/test_decision_context.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cadence/tests/test_decision_context.py b/cadence/tests/test_decision_context.py index 292f067..fda0c09 100644 --- a/cadence/tests/test_decision_context.py +++ b/cadence/tests/test_decision_context.py @@ -10,6 +10,7 @@ ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskTimedOutEventAttributes, \ TimeoutType from cadence.decision_loop import DecisionContext, ReplayDecider +from cadence.exception_handling import ExternalException, serialize_exception from cadence.exceptions import NonDeterministicWorkflowException, ActivityTaskFailedException, \ ActivityTaskTimeoutException, ActivityFailureException @@ -140,13 +141,16 @@ def test_activity_task_failed(self): attr.scheduled_event_id = 20 event.activity_task_failed_event_attributes = attr attr.reason = "the-reason" - attr.details = bytes("details", "utf-8") + ex = None + try: + raise DummyUserLevelException("abc") + except Exception as e: + ex = e + attr.details = serialize_exception(ex) self.context.handle_activity_task_failed(event) self.assertTrue(self.future.done()) exception = self.future.exception() - self.assertIsInstance(exception, ActivityTaskFailedException) - self.assertEqual(attr.reason, exception.reason) - self.assertEqual(attr.details, exception.details) + self.assertIsInstance(exception, DummyUserLevelException) self.assertEqual(0, len(self.context.scheduled_activities)) def test_activity_task_timed_out(self): From 0a4d329e26a643105d8ff250e0adf88d8430286c Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 02:04:48 +0800 Subject: [PATCH 13/50] Mock process_event() so that it is not explicitly executed here. --- cadence/tests/test_decision_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 10d8c71..2d4ba10 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -311,6 +311,7 @@ def test_process_decision_events_notifies_when_replay(self): replay_current_time_milliseconds=0, next_decision_event_id=5) self.decider.notify_decision_sent = MagicMock() + self.decider.process_event = Mock() self.decider.process_decision_events(decision_events) self.decider.notify_decision_sent.assert_called_once() From 26dfa447bfc118fb4cf2bbba321f6115dba73c1d Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 18:11:00 +0800 Subject: [PATCH 14/50] cadence-cli will pass an empty byte array when a workflow is invoked with no arguments. --- cadence/decision_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index aa5bb8a..b45736c 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -581,7 +581,7 @@ def process_event(self, event: HistoryEvent): def handle_workflow_execution_started(self, event: HistoryEvent): start_event_attributes = event.workflow_execution_started_event_attributes self.decision_context.set_current_run_id(start_event_attributes.original_execution_run_id) - if start_event_attributes.input is None: + if start_event_attributes.input is None or start_event_attributes.input == b'': workflow_input = [] else: workflow_input = json_to_args(start_event_attributes.input) From 02e4662d13e05e633fe835a876b02bb4af2f40e9 Mon Sep 17 00:00:00 2001 From: Jakub Brzeski Date: Sun, 22 Mar 2020 14:18:52 +0100 Subject: [PATCH 15/50] Fix setup.py by including rest of required dependencies. --- setup.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d0ac9e5..bb54439 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,14 @@ long_description_content_type="text/markdown", url="https://github.com/firdaus/cadence-python", packages=setuptools.find_packages(exclude=["cadence.tests", "cadence.spikes"]), - install_requires=["more-itertools>=7.0.0", "thriftrw>=1.7.2"], + install_requires=[ + "dataclasses-json>=0.3.8", + "more-itertools>=7.0.0", + "ply>=3.11", + "six>=1.12.0", + "tblib>=1.6.0", + "thriftrw>=1.7.2", + ], classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", From 502cfe9d5f2f164d9c1b1c0d4013409c200e3afd Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 22:41:35 +0800 Subject: [PATCH 16/50] Return a string representation in order to not break unit tests. --- cadence/errors.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cadence/errors.py b/cadence/errors.py index f4ecf17..424698d 100644 --- a/cadence/errors.py +++ b/cadence/errors.py @@ -8,6 +8,9 @@ class BadRequestError(Exception): message: str + def __str__(self): + return self.message + @dataclass class InternalServiceError(Exception): @@ -38,6 +41,9 @@ def run_id(self): class EntityNotExistsError(Exception): message: str + def __str__(self): + return self.message + @dataclass class ServiceBusyError(Exception): From 1ddc94b3d316035456819ad6165b84e62d06742a Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 22:43:50 +0800 Subject: [PATCH 17/50] Remove test_query_workflow_timeout() - seems like the backend behavior changed, it now returns "workflow must handle at least one decision task before it can be queried". --- cadence/tests/test_workflowservice.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/cadence/tests/test_workflowservice.py b/cadence/tests/test_workflowservice.py index d957221..d0b2bfd 100644 --- a/cadence/tests/test_workflowservice.py +++ b/cadence/tests/test_workflowservice.py @@ -341,19 +341,6 @@ def test_reset_sticky_task_list(self): self.assertIsNone(err) self.assertIsNotNone(response) - def test_query_workflow_timeout(self): - start_response, _ = self.service.start_workflow(self.request) - request = QueryWorkflowRequest() - request.domain = "test-domain" - request.execution = WorkflowExecution() - request.execution.workflow_id = self.request.workflow_id - request.execution.run_id = start_response.run_id - request.query = WorkflowQuery() - request.query.query_type = "getDummy" - request.query.query_args = None - with self.assertRaisesRegex(TChannelException, "timeout") as context: - self.service.query_workflow(request) - def test_describe_workflow_execution(self): start_response, _ = self.service.start_workflow(self.request) request = DescribeWorkflowExecutionRequest() From c29310e44a00b7f0849325bcbb42b8b36824b759 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 22:59:53 +0800 Subject: [PATCH 18/50] Without the type annotation, `replayed` isn't recognized in the constructor. --- cadence/marker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/marker.py b/cadence/marker.py index 1259dfa..6de6940 100644 --- a/cadence/marker.py +++ b/cadence/marker.py @@ -70,7 +70,7 @@ def get_id(self) -> str: class MarkerResult: data: bytes = None access_count: int = 0 - replayed = False + replayed: bool = False @dataclass From 8065a007daf637f22c2a4e83e4c7797377fac2f9 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 23:02:32 +0800 Subject: [PATCH 19/50] Behavior changed, unit test was outdated. For no decisions to be created (i.e. len(decisions() == 0), we must have already replayed the marker already. --- cadence/tests/test_marker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cadence/tests/test_marker.py b/cadence/tests/test_marker.py index 68bb0bc..ae24e5f 100644 --- a/cadence/tests/test_marker.py +++ b/cadence/tests/test_marker.py @@ -145,12 +145,12 @@ def test_get_marker_data_lower_access_count(marker_recorded_event, decision_cont assert data is None -def test_handle_replaying_get_from_history(decision_context): +def test_handle_replaying_get_from_history_before_replay(decision_context): def callback(stored): raise Exception("Should not be executed") handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name") - handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35) + handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35, replayed=True) ret = handler.handle("the-id", callback) assert ret == b'123' assert len(decision_context.decider.decisions) == 0 From 8a8dd1d29f278f9e2eeccc54138f886c87f1612c Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 23:03:31 +0800 Subject: [PATCH 20/50] Unit test: A decision will be created during first replay (replayed=False). --- cadence/tests/test_marker.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cadence/tests/test_marker.py b/cadence/tests/test_marker.py index ae24e5f..e187978 100644 --- a/cadence/tests/test_marker.py +++ b/cadence/tests/test_marker.py @@ -156,6 +156,17 @@ def callback(stored): assert len(decision_context.decider.decisions) == 0 +def test_handle_replaying_get_from_history_after_replay(decision_context): + def callback(stored): + raise Exception("Should not be executed") + + handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name") + handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35, replayed=False) + ret = handler.handle("the-id", callback) + assert ret == b'123' + assert len(decision_context.decider.decisions) == 1 + + def test_handle_replaying_no_history(decision_context): def callback(stored): raise Exception("Should not be executed") From 5d81eb1388b4d58cc6d26d7f3c3ee0e460dea3a7 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 23:06:20 +0800 Subject: [PATCH 21/50] Unit test is outdated after fixing the versioning issue: test_handle_replaying_no_history. --- cadence/tests/test_marker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/tests/test_marker.py b/cadence/tests/test_marker.py index e187978..9a8f304 100644 --- a/cadence/tests/test_marker.py +++ b/cadence/tests/test_marker.py @@ -176,7 +176,7 @@ def callback(stored): handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35) ret = handler.handle("the-id", callback) assert ret == b'123' - assert len(decision_context.decider.decisions) == 0 + assert len(decision_context.decider.decisions) == 1 def test_handle_not_replaying_callback_returns_not_none(decision_context): From b98d8f99ba5a56a7a270c80fa267b5b3167a760e Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 23:16:42 +0800 Subject: [PATCH 22/50] Fix: test_clock_decision_context_get_version_stored - with new workflow versioning behavior. --- cadence/tests/test_version.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cadence/tests/test_version.py b/cadence/tests/test_version.py index 1693a13..d28833c 100644 --- a/cadence/tests/test_version.py +++ b/cadence/tests/test_version.py @@ -50,10 +50,12 @@ def test_clock_decision_context_get_version(decision_context): def test_clock_decision_context_get_version_stored(decision_context): + # is_replaying=True decision_context.workflow_clock.version_handler.mutable_marker_results["abc"] = MarkerResult(data="3") version = decision_context.workflow_clock.get_version("abc", 1, 5) assert version == 3 - assert len(decision_context.decider.decisions) == 0 + # decision needs to be emitted during replay to make things deterministic + assert len(decision_context.decider.decisions) == 1 @pytest.fixture() From 9f488eebff27050e9f5ecf0f95d5e8a64196cc00 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 22 Mar 2020 23:17:46 +0800 Subject: [PATCH 23/50] An exception is thrown in this case because we are replaying when -1 (DEFAULT_VERSION) isn't supported. --- cadence/tests/test_version.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cadence/tests/test_version.py b/cadence/tests/test_version.py index d28833c..c09a092 100644 --- a/cadence/tests/test_version.py +++ b/cadence/tests/test_version.py @@ -88,10 +88,10 @@ def version_decision_context(version_marker_recorded_event): def test_clock_decision_context_from_replay(version_decision_context): - version_decision_context.workflow_clock.set_replaying(True) - version = version_decision_context.workflow_clock.get_version("abc", 1, 5) - assert version == -1 - assert len(version_decision_context.decider.decisions) == 0 + with pytest.raises(Exception) as exc_info: + version_decision_context.workflow_clock.set_replaying(True) + version = version_decision_context.workflow_clock.get_version("abc", 1, 5) + assert "Version -1 of changeID abc is not supported. Supported version is between 1 and 5" in str(exc_info.value) def test_validate_version(version_decision_context): From b3b827867711f0dc596e531b9c1e11cc8a3d5d16 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Mon, 23 Mar 2020 21:49:02 +0800 Subject: [PATCH 24/50] Disable unit test for Java interoperability. --- cadence/tests/test_java.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cadence/tests/test_java.py b/cadence/tests/test_java.py index 62cd469..fa014d7 100644 --- a/cadence/tests/test_java.py +++ b/cadence/tests/test_java.py @@ -1,3 +1,5 @@ +# TODO: Re-enable once interoperability is a design goal again +""" from unittest import TestCase import logging.config from uuid import uuid4 @@ -39,4 +41,4 @@ def test(self): greeting = stub.getGreeting("World") self.assertEqual("Hello World", greeting) - +""" From 19973ba92f3c5ac9bf17c9bb048b44db0fa09bf2 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Wed, 25 Mar 2020 17:57:08 +0800 Subject: [PATCH 25/50] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 098fab0..e95742e 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ ## Status / TODO cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production -version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ March 2020. +version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020. 1.0 - [x] Tchannel implementation From e80016aca1b7c79590bf24910592e8ef61c26cf2 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Fri, 27 Mar 2020 22:57:33 +0800 Subject: [PATCH 26/50] Add thrift-parser.js to make the code complete. --- cadence/thrift/thrift-parser.js | 641 ++++++++++++++++++++++++ cadence/thrift/thrift-parser.js-LICENSE | 27 + 2 files changed, 668 insertions(+) create mode 100644 cadence/thrift/thrift-parser.js create mode 100644 cadence/thrift/thrift-parser.js-LICENSE diff --git a/cadence/thrift/thrift-parser.js b/cadence/thrift/thrift-parser.js new file mode 100644 index 0000000..471c791 --- /dev/null +++ b/cadence/thrift/thrift-parser.js @@ -0,0 +1,641 @@ +class ThriftFileParsingError extends SyntaxError { + constructor({ message, context, line }) { + super(message); + this.context = context; + this.line = line; + this.name = 'THRIFT_FILE_PARSING_ERROR'; + } +} + +module.exports = (source, offset = 0) => { + + source += ''; + + let nCount = 0; + let rCount = 0; + + let stack = []; + + const record = char => { + if (char === '\r') rCount++; + else if (char === '\n') nCount++; + }; + const save = () => stack.push({ offset, nCount, rCount }); + const restore = () => ({ offset, nCount, rCount } = stack[stack.length - 1]); + const drop = () => stack.pop(); + + const readAnyOne = (...args) => { + save(); + for (let i = 0; i < args.length; i++) { + try { + let result = args[i](); + drop(); + return result; + } catch (ignore) { + restore(); + continue; + } + } + drop(); + throw 'Unexcepted Token'; + }; + + const readUntilThrow = (transaction, key) => { + let receiver = key ? {} : []; + for (;;) { + try { + save(); + let result = transaction(); + key ? receiver[result[key]] = result : receiver.push(result); + } catch (ignore) { + restore(); + return receiver; + } finally { + drop(); + } + } + }; + + const readKeyword = word => { + for (let i = 0; i < word.length; i++) { + if (source[offset + i] !== word[i]) { + throw 'Unexpected token "' + word + '"'; + } + } + offset += word.length; + readSpace(); + return word; + }; + + const readChar = (char) => { + if (source[offset] !== char) throw 'Unexpected char "' + char + '"'; + offset++; + readSpace(); + return char; + }; + + const readNoop = () => {}; + + const readCommentMultiple = () => { + let i = 0; + if (source[offset + i++] !== '/' || source[offset + i++] !== '*') return false; + do { + record(source[offset + i]); + while (offset + i < source.length && source[offset + i++] !== '*') { + record(source[offset + i]); + } + } while (offset + i < source.length && source[offset + i] !== '/'); + offset += i + 1; + return true; + }; + + const readCommentSharp = () => { + let i = 0; + if (source[offset + i++] !== '#') return false; + while (source[offset + i] !== '\n' && source[offset + i] !== '\r') offset++; + offset += i; + return true; + }; + + const readCommentDoubleSlash = () => { + let i = 0; + if (source[offset + i++] !== '/' || source[offset + i++] !== '/') return false; + while (source[offset + i] !== '\n' && source[offset + i] !== '\r') offset++; + offset += i; + return true; + }; + + const readSpace = () => { + for (;;) { + let byte = source[offset]; + record(byte); + if (byte === '\n' || byte === '\r' || byte === ' ' || byte === '\t') { + offset++; + } else { + if (!readCommentMultiple() && !readCommentSharp() && !readCommentDoubleSlash()) return; + } + } + }; + + const readComma = () => { + if (source[offset] === ',' || source[offset] === ';') { + offset++; + readSpace(); + return ','; + } + }; + + const readTypedef = () => { + let subject = readKeyword('typedef'); + let type = readType(); + let name = readName(); + readComma(); + return {subject, type, name}; + }; + + const readType = () => readAnyOne(readTypeMap, readTypeList, readTypeNormal); + + const readTypeMap = () => { + let name = readKeyword('map'); + readChar('<'); + let keyType = readType(); + readComma(); + let valueType = readType(); + readChar('>'); + return {name, keyType, valueType}; + }; + + const readTypeList = () => { + let name = readAnyOne(() => readKeyword('list'), () => readKeyword('set')); + readChar('<'); + let valueType = readType(); + readChar('>'); + return {name, valueType}; + }; + + const readTypeNormal = () => readName(); + + const readName = () => { + let i = 0; + let byte = source[offset]; + while ( + (byte >= 'a' && byte <= 'z') || + byte === '.' || + byte === '_' || + (byte >= 'A' && byte <= 'Z') || + (byte >= '0' && byte <= '9') + ) byte = source[offset + ++i]; + if (i === 0) throw 'Unexpected token on readName'; + let value = source.slice(offset, offset += i); + readSpace(); + return value; + }; + + const readScope = () => { + let i = 0; + let byte = source[offset]; + while ( + (byte >= 'a' && byte <= 'z') || + byte === '_' || + (byte >= 'A' && byte <= 'Z') || + (byte >= '0' && byte <= '9') || + (byte === '*') || + (byte === '.') + ) byte = source[offset + ++i]; + if (i === 0) throw 'Unexpected token on readScope'; + let value = source.slice(offset, offset += i); + readSpace(); + return value; + }; + + const readNumberSign = () => { + let result; + if (source[offset] === '+' || source[offset] === '-') { + result = source[offset]; + offset++; + } + return result; + }; + + const readIntegerValue = () => { + let result = []; + let sign = readNumberSign(); + if (sign !== void 0) result.push(sign); + + for (; ;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9')) { + offset++; + result.push(byte); + } else if ( + byte === 'E' || byte === 'e' || + byte === 'X' || byte === 'x' || + byte === '.' + ) { + throw `Unexpected token ${byte} for integer value`; + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readDecimalValue = () => { + let result = []; + let sign = readNumberSign(); + if (sign !== void 0) result.push(sign); + + for (;;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9') || byte === '.') { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readEnotationValue = () => { + let result = []; + if (source[offset] === '-') { + result.push(source[offset]); + offset++; + } + + for (;;) { + let byte = source[offset]; + if ((byte >= '0' && byte <= '9') || byte === '.') { + result.push(byte); + offset++; + } else { + break; + } + } + + if (source[offset] !== 'e' && source[offset] !== 'E') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + for (;;) { + let byte = source[offset]; + if (byte >= '0' && byte <= '9') { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readHexadecimalValue = () => { + let result = []; + if (source[offset] === '-') { + result.push(source[offset]); + offset++; + } + + if (source[offset] !== '0') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + if (source[offset] !== 'x' && source[offset] !== 'X') throw 'Unexpected token'; + result.push(source[offset]); + offset++; + + for (;;) { + let byte = source[offset]; + if ( + (byte >= '0' && byte <= '9') || + (byte >= 'A' && byte <= 'F') || + (byte >= 'a' && byte <= 'f') + ) { + offset++; + result.push(byte); + } else { + if (result.length) { + readSpace(); + return +result.join(''); + } else { + throw 'Unexpected token ' + byte; + } + } + } + }; + + const readBooleanValue = () => JSON.parse(readAnyOne(() => readKeyword('true'), () => readKeyword('false'))); + + const readRefValue = () => { + let list = [readName()]; + readUntilThrow(() => { + readChar('.'); + list.push(readName()); + }); + return {'=': list}; + }; + + const readStringValue = () => { + let receiver = []; + let start; + while (source[offset] != null) { + let byte = source[offset++]; + if (receiver.length) { + if (byte === start) { + receiver.push(byte); + readSpace(); + return receiver.slice(1, -1).join(''); + } else if (byte === '\\') { + receiver.push(byte); + offset++; + receiver.push(source[offset++]); + } else { + receiver.push(byte); + } + } else { + if (byte === '"' || byte === '\'') { + start = byte; + receiver.push(byte); + } else { + throw 'Unexpected token ILLEGAL'; + } + } + } + throw 'Unterminated string value'; + }; + + const readListValue = () => { + readChar('['); + let list = readUntilThrow(() => { + let value = readValue(); + readComma(); + return value; + }); + readChar(']'); + return list; + }; + + const readMapValue = () => { + readChar('{'); + let list = readUntilThrow(() => { + let key = readValue(); + readChar(':'); + let value = readValue(); + readComma(); + return {key, value}; + }); + readChar('}'); + return list; + }; + + const readValue = () => readAnyOne( + readHexadecimalValue, // This coming before readNumberValue is important, unfortunately + readEnotationValue, // This also needs to come before readNumberValue + readDecimalValue, + readIntegerValue, + readStringValue, + readBooleanValue, + readListValue, + readMapValue, + readRefValue + ); + + const readConst = () => { + let subject = readKeyword('const'); + let type = readType(); + let name = readName(); + readChar('='); + let value = readValue(); + readComma(); + return {subject, type, name, value}; + }; + + const readEnum = () => { + let subject = readKeyword('enum'); + let name = readName(); + let items = readEnumBlock(); + return {subject, name, items}; + }; + + const readEnumBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readEnumItem); + readChar('}'); + return receiver; + }; + + const readEnumItem = () => { + let name = readName(); + let value = readEnumValue(); + readComma(); + let result = {name}; + if (value !== void 0) result.value = value; + return result; + }; + + const readEnumValue = () => { + let beginning = offset; + try { + readChar('='); + } catch (ignore) { + offset = beginning; + return; + } + return readAnyOne(readHexadecimalValue, readIntegerValue); + }; + + const readAssign = () => { + try { + save(); + readChar('='); + return readValue(); + } catch (ignore) { + restore(); + } finally { + drop(); + } + }; + + const readStruct = () => { + let subject = readKeyword('struct'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readStructLikeBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readStructLikeItem); + readChar('}'); + return receiver; + }; + + const readStructLikeItem = () => { + let id; + try { + id = readAnyOne(readHexadecimalValue, readIntegerValue); + readChar(':'); + } catch (err) { + + } + + let option = readAnyOne(() => readKeyword('required'), () => readKeyword('optional'), readNoop); + let type = readType(); + let name = readName(); + let defaultValue = readAssign(); + readComma(); + let result = {type, name}; + if (id !== void 0) result.id = id; + if (option !== void 0) result.option = option; + if (defaultValue !== void 0) result.defaultValue = defaultValue; + return result; + }; + + const readUnion = () => { + let subject = readKeyword('union'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readException = () => { + let subject = readKeyword('exception'); + let name = readName(); + let items = readStructLikeBlock(); + return {subject, name, items}; + }; + + const readExtends = () => { + try { + save(); + readKeyword('extends'); + let name = readRefValue()['='].join('.'); + return name; + } catch (ignore) { + restore(); + return; + } finally { + drop(); + } + }; + + const readService = () => { + let subject = readKeyword('service'); + let name = readName(); + let extend = readExtends(); // extends is a reserved keyword + let functions = readServiceBlock(); + let result = {subject, name}; + if (extend !== void 0) result.extends = extend; + if (functions !== void 0) result.functions = functions; + return result; + }; + + const readNamespace = () => { + let subject = readKeyword('namespace'); + let name = readScope(); + let serviceName = readRefValue()['='].join('.'); + return {subject, name, serviceName}; + }; + + const readInclude = () => { + let subject = readKeyword('include'); + readSpace(); + let includePath = readQuotation(); + let name = includePath.replace(/^.*?([^/\\]*?)(?:\.thrift)?$/, '$1'); + readSpace(); + return {subject, name, path: includePath}; + }; + + const readQuotation = () => { + let quoteMatch; + if (source[offset] === '"' || source[offset] === '\'') { + quoteMatch = source[offset]; + offset++; + } else { + throw 'include error'; + } + let i = offset; + // Read until it finds a matching quote or end-of-file + while (source[i] !== quoteMatch && source[i] != null) { + i++; + } + if (source[i] === quoteMatch) { + let value = source.slice(offset, i); + offset = i + 1; + return value; + } else { + throw 'include error'; + } + }; + + const readServiceBlock = () => { + readChar('{'); + let receiver = readUntilThrow(readServiceItem, 'name'); + readChar('}'); + return receiver; + }; + + const readOneway = () => readKeyword('oneway'); + + const readServiceItem = () => { + let oneway = !!readAnyOne(readOneway, readNoop); + let type = readType(); + let name = readName(); + let args = readServiceArgs(); + let throws = readServiceThrow(); + readComma(); + return {type, name, args, throws, oneway}; + }; + + const readServiceArgs = () => { + readChar('('); + let receiver = readUntilThrow(readStructLikeItem); + readChar(')'); + readSpace(); + return receiver; + }; + + const readServiceThrow = () => { + try { + save(); + readKeyword('throws'); + return readServiceArgs(); + } catch (ignore) { + restore(); + return []; + } finally { + drop(); + } + }; + + const readSubject = () => { + return readAnyOne(readTypedef, readConst, readEnum, readStruct, readUnion, readException, readService, readNamespace, readInclude); + }; + + const readThrift = () => { + readSpace(); + let storage = {}; + for (;;) { + try { + let block = readSubject(); + let {subject, name} = block; + if (!storage[subject]) storage[subject] = {}; + delete block.subject; + delete block.name; + switch (subject) { + case 'exception': + case 'struct': + case 'union': + storage[subject][name] = block.items; + break; + default: + storage[subject][name] = block; + } + } catch (message) { + let context = source.slice(offset, offset + 50); + let line = Math.max(nCount, rCount) + 1; + console.log("message=" + message + " context=" + context + " line=" + line); + throw new ThriftFileParsingError({ message, context, line }); + } finally { + if (source.length === offset) break; + } + } + return storage; + }; + + return readThrift(); + +}; diff --git a/cadence/thrift/thrift-parser.js-LICENSE b/cadence/thrift/thrift-parser.js-LICENSE new file mode 100644 index 0000000..eaa1b1c --- /dev/null +++ b/cadence/thrift/thrift-parser.js-LICENSE @@ -0,0 +1,27 @@ +I took thrift-parser.js from here: + +https://github.com/eleme/thrift-parser + +The license for which is: + +The MIT License (MIT) + +Copyright (c) <2016-2017> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. From 1ebfbf05d5a25dc38865f70aec81cf2e7781dbb3 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Fri, 27 Mar 2020 23:26:44 +0800 Subject: [PATCH 27/50] Released beta1. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index bb54439..a9f6f79 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cadence-client", - version="0.0.4", + version="1.0.0-beta1", author="Mohammed Firdaus", author_email="firdaus.halim@gmail.com", description="Python framework for Cadence Workflow Service", From 9a64a1a2ec616f6312c02a319fc12c9426aae088 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 28 Mar 2020 19:05:40 +0800 Subject: [PATCH 28/50] Update README.md --- README.md | 116 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 66 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index e95742e..2e6fa3e 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,25 @@ -# Python framework for Cadence Workflow Service +# Fault-Oblivious Stateful Python Code -[Cadence](https://github.com/uber/cadence) is a workflow engine developed at Uber Engineering. With this framework, workflows and activities managed by Cadence can be implemented in Python code. +cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. -## Status / TODO - -cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production -version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020. - -1.0 -- [x] Tchannel implementation -- [x] Python-friendly wrapper around Cadence's Thrift API -- [x] Author activities in Python -- [x] Start workflows (synchronously) -- [x] Create workflows -- [x] Workflow execution in coroutines -- [x] Invoke activities from workflows -- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally -- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn -- [x] Activity retry -- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution() -- [x] Signals -- [x] Queries -- [x] Async workflow execution -- [x] await -- [x] now (currentTimeMillis) -- [x] Sleep -- [x] Loggers -- [x] newRandom -- [x] UUID -- [x] Workflow Versioning -- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId); +This programming model is useful whenever you need to ensure that a function runs to completion. For example: -1.1 -- [ ] ActivityStub and Workflow.newUntypedActivityStub -- [ ] Classes as arguments and return values to/from activity and workflow methods -- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub -- [ ] Custom workflow ids through start() and new_workflow_stub() -- [ ] ContinueAsNew -- [ ] Compatibility with Java client -- [ ] Compatibility with Golang client +- Business logic involving multiple micro services +- CI/CD pipelines +- Data pipelines +- RPA +- Marketing automation / Customer journeys / Customer engagement +- Zapier/IFTTT like end user automation. +- Chat bots -2.0 -- [ ] Sticky workflows +Behind the scenes, cadence-python uses the [Cadence](https://github.com/uber/cadence) as its backend. -Post 2.0: -- [ ] sideEffect/mutableSideEffect -- [ ] Local activity -- [ ] Parallel activity execution -- [ ] Timers -- [ ] Cancellation Scopes -- [ ] Child Workflows -- [ ] Explicit activity ids for activity invocations +For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows) ## Installation ``` -pip install cadence-client +pip install cadence-client==1.0.0b1 ``` ## Hello World Sample @@ -119,4 +83,56 @@ if __name__ == '__main__': worker.stop() print("Workers stopped...") sys.exit(0) -``` +``` + +## Status / TODO + +cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production +version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020. + +1.0 +- [x] Tchannel implementation +- [x] Python-friendly wrapper around Cadence's Thrift API +- [x] Author activities in Python +- [x] Start workflows (synchronously) +- [x] Create workflows +- [x] Workflow execution in coroutines +- [x] Invoke activities from workflows +- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally +- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn +- [x] Activity retry +- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution() +- [x] Signals +- [x] Queries +- [x] Async workflow execution +- [x] await +- [x] now (currentTimeMillis) +- [x] Sleep +- [x] Loggers +- [x] newRandom +- [x] UUID +- [x] Workflow Versioning +- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId); + +1.1 +- [ ] ActivityStub and Workflow.newUntypedActivityStub +- [ ] Classes as arguments and return values to/from activity and workflow methods +- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub +- [ ] Custom workflow ids through start() and new_workflow_stub() +- [ ] ContinueAsNew +- [ ] Compatibility with Java client +- [ ] Compatibility with Golang client + +2.0 +- [ ] Sticky workflows + +Post 2.0: +- [ ] sideEffect/mutableSideEffect +- [ ] Local activity +- [ ] Parallel activity execution +- [ ] Timers +- [ ] Cancellation Scopes +- [ ] Child Workflows +- [ ] Explicit activity ids for activity invocations + + From 78a0fff69c930500767ed0d1408152d6b07e0046 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 28 Mar 2020 19:33:18 +0800 Subject: [PATCH 29/50] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2e6fa3e..8bfc73d 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ This programming model is useful whenever you need to ensure that a function run - Zapier/IFTTT like end user automation. - Chat bots -Behind the scenes, cadence-python uses the [Cadence](https://github.com/uber/cadence) as its backend. +Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend. For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows) From d2a648a8d5583e9dcf67df58b6b7fc9f2cf664c3 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 28 Mar 2020 19:47:01 +0800 Subject: [PATCH 30/50] Update README.md --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8bfc73d..e6c3d4d 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,20 @@ Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows) -## Installation +## Install Cadencce + +``` +wget https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml +docker-compose up +``` + +## Register `sample` domain + +``` +docker run --network=host --rm ubercadence/cli:master --do sample domain register -rd 1 +``` + +## Installation cadence-python ``` pip install cadence-client==1.0.0b1 From 594d0d8084c14cee2694c621dd3bf6ce60454430 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 28 Mar 2020 19:53:28 +0800 Subject: [PATCH 31/50] Update README.md --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e6c3d4d..2294642 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Fault-Oblivious Stateful Python Code +# Intro: Fault-Oblivious Stateful Python Code cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. @@ -77,6 +77,9 @@ class GreetingWorkflowImpl(GreetingWorkflow): self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities) async def get_greeting(self, name): + # Place any Python code here that you want to ensure is executed to completion. + # Note: code in workflow functions must be deterministic so that the same code paths + # are ran during replay. return await self.greeting_activities.compose_greeting("Hello", name) From eca0d0026de58aecf1fe8a3f023d1659da52d798 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 00:31:01 +0800 Subject: [PATCH 32/50] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2294642..8f7941f 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,11 @@ This programming model is useful whenever you need to ensure that a function run - CI/CD pipelines - Data pipelines - RPA +- ETL - Marketing automation / Customer journeys / Customer engagement - Zapier/IFTTT like end user automation. - Chat bots +- Multi-step forms Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend. From d6bbc9aafac5caf717ccb735c645c4067f37d427 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 01:14:28 +0800 Subject: [PATCH 33/50] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 8f7941f..2d2dd2c 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ This programming model is useful whenever you need to ensure that a function run - Zapier/IFTTT like end user automation. - Chat bots - Multi-step forms +- Scheduler/Cron jobs Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend. From df222de11ce5b0d139f48cc4f1d4ed0fab0a80c5 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 17:07:34 +0800 Subject: [PATCH 34/50] Add timeout for socket connections. --- cadence/activity_loop.py | 2 +- cadence/connection.py | 5 +++-- cadence/constants.py | 4 ++++ cadence/decision_loop.py | 2 +- cadence/worker.py | 7 +++++++ cadence/workflow.py | 5 +++-- 6 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index e7394cd..53a1ce2 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -12,7 +12,7 @@ def activity_task_loop(worker: Worker): - service: WorkflowService = WorkflowService.create(worker.host, worker.port) + service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) logger.info(f"Activity task worker started: {WorkflowService.get_identity()}") try: diff --git a/cadence/connection.py b/cadence/connection.py index 7afd942..aa45f28 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -299,15 +299,16 @@ class TChannelConnection: s: socket.socket @classmethod - def open(cls, host: object, port: object) -> TChannelConnection: + def open(cls, host: object, port: object, timeout: int = None) -> TChannelConnection: s: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) s.connect((host, port)) return cls(s) def __init__(self, s: socket): self.s = s self.file = self.s.makefile("rwb") - self.wrapper = IOWrapper(self.file) + self.wrapper = IOWrapper(self.file, socket_=s) self.current_id = -1 self.handshake() diff --git a/cadence/constants.py b/cadence/constants.py index 24a63ce..55ae6a1 100644 --- a/cadence/constants.py +++ b/cadence/constants.py @@ -1,3 +1,7 @@ CODE_OK = 0x00 CODE_ERROR = 0x01 + +# This should be at least 60 seconds because Cadence will reply after 60 seconds when polling +# if there is nothing pending +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index b45736c..ed80461 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -847,7 +847,7 @@ def run(self): logger.info(f"Decision task worker started: {WorkflowService.get_identity()}") event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) - self.service = WorkflowService.create(self.worker.host, self.worker.port) + self.service = WorkflowService.create(self.worker.host, self.worker.port, timeout=self.worker.get_timeout()) self.worker.manage_service(self.service) while True: if self.worker.is_stop_requested(): diff --git a/cadence/worker.py b/cadence/worker.py index 31f4caf..37ae7b7 100644 --- a/cadence/worker.py +++ b/cadence/worker.py @@ -5,6 +5,7 @@ import logging import time +from cadence.constants import DEFAULT_SOCKET_TIMEOUT_SECONDS from cadence.conversions import camel_to_snake, snake_to_camel from cadence.workflow import WorkflowMethod, SignalMethod, QueryMethod from cadence.workflowservice import WorkflowService @@ -69,6 +70,7 @@ class Worker: threads_stopped: int = 0 stop_requested: bool = False service_instances: List[WorkflowService] = field(default_factory=list) + timeout: int = DEFAULT_SOCKET_TIMEOUT_SECONDS def register_activities_implementation(self, activities_instance: object, activities_cls_name: str = None): cls_name = activities_cls_name if activities_cls_name else type(activities_instance).__name__ @@ -149,4 +151,9 @@ def get_workflow_method(self, workflow_type_name: str) -> Tuple[type, Callable]: def manage_service(self, service: WorkflowService): self.service_instances.append(service) + def set_timeout(self, timeout): + self.timeout = timeout + + def get_timeout(self): + return self.timeout diff --git a/cadence/workflow.py b/cadence/workflow.py index 08ad63c..d2f9183 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -16,6 +16,7 @@ GetWorkflowExecutionHistoryRequest, WorkflowExecution, HistoryEventFilterType, EventType, HistoryEvent, \ StartWorkflowExecutionResponse, SignalWorkflowExecutionRequest, QueryWorkflowRequest, WorkflowQuery, \ QueryWorkflowResponse +from cadence.constants import DEFAULT_SOCKET_TIMEOUT_SECONDS from cadence.conversions import args_to_json, json_to_args from cadence.errors import QueryFailedError from cadence.exception_handling import deserialize_exception @@ -107,8 +108,8 @@ class WorkflowClient: @classmethod def new_client(cls, host: str = "localhost", port: int = 7933, domain: str = "", - options: WorkflowClientOptions = None) -> WorkflowClient: - service = WorkflowService.create(host, port) + options: WorkflowClientOptions = None, timeout: int = DEFAULT_SOCKET_TIMEOUT_SECONDS) -> WorkflowClient: + service = WorkflowService.create(host, port, timeout=timeout) return cls(service=service, domain=domain, options=options) @classmethod From 363a320ba056f10737952945bdd8c3e719c90677 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 17:23:22 +0800 Subject: [PATCH 35/50] Improve the performance of worker.stop() by checking the is_stop_requested() more often during polling. --- cadence/activity_loop.py | 6 +++++- cadence/connection.py | 5 ++++- cadence/decision_loop.py | 37 +++++++++++++++++++++---------------- cadence/ioutils.py | 27 ++++++++++++++++++++++----- cadence/worker.py | 7 +++++++ cadence/workflowservice.py | 9 ++++++--- 6 files changed, 65 insertions(+), 26 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 53a1ce2..cd4f16b 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -6,7 +6,7 @@ from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse from cadence.conversions import json_to_args from cadence.workflowservice import WorkflowService -from cadence.worker import Worker +from cadence.worker import Worker, StopRequestedException logger = logging.getLogger(__name__) @@ -20,6 +20,8 @@ def activity_task_loop(worker: Worker): if worker.is_stop_requested(): return try: + service.set_next_timeout_cb(worker.raise_if_stop_requested) + polling_start = datetime.datetime.now() polling_request = PollForActivityTaskRequest() polling_request.task_list_metadata = TaskListMetadata() @@ -32,6 +34,8 @@ def activity_task_loop(worker: Worker): task, err = service.poll_for_activity_task(polling_request) polling_end = datetime.datetime.now() logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000) + except StopRequestedException: + return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) continue diff --git a/cadence/connection.py b/cadence/connection.py index aa45f28..1e1a0d3 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -4,7 +4,7 @@ import socket from dataclasses import dataclass from io import BytesIO -from typing import IO, List, Union, Optional, Dict +from typing import IO, List, Union, Optional, Dict, Callable from cadence.frames import InitReqFrame, Frame, Arg, CallReqFrame, CallReqContinueFrame, CallResFrame, \ CallResContinueFrame, FrameWithArgs, CallFlags, ErrorFrame @@ -313,6 +313,9 @@ def __init__(self, s: socket): self.handshake() + def set_next_timeout_cb(self, cb: Callable): + self.wrapper.set_next_timeout_cb(cb) + def new_id(self): self.current_id += 1 return self.current_id diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index ed80461..3e17953 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -4,6 +4,7 @@ import contextvars import datetime import json +import socket import uuid import random import logging @@ -35,7 +36,7 @@ from cadence.state_machines import ActivityDecisionStateMachine, DecisionStateMachine, CompleteWorkflowStateMachine, \ TimerDecisionStateMachine, MarkerDecisionStateMachine from cadence.tchannel import TChannelException -from cadence.worker import Worker +from cadence.worker import Worker, StopRequestedException from cadence.workflow import QueryMethod from cadence.workflowservice import WorkflowService @@ -850,23 +851,27 @@ def run(self): self.service = WorkflowService.create(self.worker.host, self.worker.port, timeout=self.worker.get_timeout()) self.worker.manage_service(self.service) while True: - if self.worker.is_stop_requested(): + try: + if self.worker.is_stop_requested(): + return + self.service.set_next_timeout_cb(self.worker.raise_if_stop_requested) + decision_task: PollForDecisionTaskResponse = self.poll() + if not decision_task: + continue + if decision_task.query: + try: + result = self.process_query(decision_task) + self.respond_query(decision_task.task_token, result, None) + except Exception as ex: + logger.error("Error") + self.respond_query(decision_task.task_token, None, serialize_exception(ex)) + else: + decisions = self.process_task(decision_task) + self.respond_decisions(decision_task.task_token, decisions) + except StopRequestedException: return - decision_task: PollForDecisionTaskResponse = self.poll() - if not decision_task: - continue - if decision_task.query: - try: - result = self.process_query(decision_task) - self.respond_query(decision_task.task_token, result, None) - except Exception as ex: - logger.error("Error") - self.respond_query(decision_task.task_token, None, serialize_exception(ex)) - else: - decisions = self.process_task(decision_task) - self.respond_decisions(decision_task.task_token, decisions) finally: - # noinspection PyPep8,PyBroadException + # noinspection PyPep8,PyBroadException try: self.service.close() except: diff --git a/cadence/ioutils.py b/cadence/ioutils.py index 3c65e9b..977d775 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -1,13 +1,30 @@ -from typing import IO +from select import select +from socket import socket +from typing import IO, Callable class IOWrapper: - io_stream: IO - - def __init__(self, io_stream: IO): + def __init__(self, io_stream: IO, socket_: socket = None): self.io_stream = io_stream + self.socket = socket_ + self.next_timeout_cb = None + + def set_next_timeout_cb(self, cb: Callable): + self.next_timeout_cb = cb def read_or_eof(self, size, field): + if self.next_timeout_cb and self.socket: + timeout = self.socket.gettimeout() + self.socket.setblocking(False) + while True: + ready_to_read, _, _ = select([self.socket], [], [], 1) + if ready_to_read: + break + else: + self.next_timeout_cb() + self.next_timeout_cb = None + self.socket.setblocking(True) + self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) if len(buf) != size: raise EOFError(field) @@ -31,7 +48,7 @@ def read_string(self, n: int, field: str) -> str: return str(buf, "utf-8") def write_short(self, v: int): - self.io_stream.write(v.to_bytes(2, byteorder='big', signed= False)) + self.io_stream.write(v.to_bytes(2, byteorder='big', signed=False)) def write_long(self, v: int): self.io_stream.write(v.to_bytes(4, byteorder='big', signed=False)) diff --git a/cadence/worker.py b/cadence/worker.py index 37ae7b7..75ea9e1 100644 --- a/cadence/worker.py +++ b/cadence/worker.py @@ -157,3 +157,10 @@ def set_timeout(self, timeout): def get_timeout(self): return self.timeout + def raise_if_stop_requested(self): + if self.is_stop_requested(): + raise StopRequestedException() + + +class StopRequestedException(Exception): + pass diff --git a/cadence/workflowservice.py b/cadence/workflowservice.py index c094437..84a329d 100644 --- a/cadence/workflowservice.py +++ b/cadence/workflowservice.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Tuple +from typing import Tuple, Callable from uuid import uuid4 import os @@ -31,8 +31,8 @@ class WorkflowService: @classmethod - def create(cls, host: str, port: int): - connection = TChannelConnection.open(host, port) + def create(cls, host: str, port: int, timeout: int = 0): + connection = TChannelConnection.open(host, port, timeout=timeout) return cls(connection) @classmethod @@ -170,3 +170,6 @@ def describe_task_list(self, request) -> Tuple[DescribeTaskListResponse, object] def close(self): self.connection.close() + + def set_next_timeout_cb(self, cb: Callable): + self.connection.set_next_timeout_cb(cb) From 962d404c1c9af912ca1de8571c57bbd3f82ebe63 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 17:23:43 +0800 Subject: [PATCH 36/50] Update sample code. --- cadence/samples/hello_activity_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/samples/hello_activity_async.py b/cadence/samples/hello_activity_async.py index 6243fb0..9e0dc4a 100644 --- a/cadence/samples/hello_activity_async.py +++ b/cadence/samples/hello_activity_async.py @@ -53,7 +53,7 @@ async def get_greeting(self, name): client = WorkflowClient.new_client(domain=DOMAIN) greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow) execution = WorkflowClient.start(greeting_workflow.get_greeting, "Python") - print("Started: workflow_id={} run_id={}".format(execution.workflow_id, execution.run_id)) + print("Started: workflow_id={} run_id={}".format(execution.workflow_execution.workflow_id, execution.workflow_execution.run_id)) print("Result: " + str(client.wait_for_close(execution))) print("Stopping workers....") From e312a8f3772fa7d6081fe905b8842fca4a0cdfdf Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sun, 29 Mar 2020 22:49:46 +0800 Subject: [PATCH 37/50] Make sure to close socket file descriptor. --- cadence/activity_loop.py | 4 ++++ cadence/connection.py | 1 + cadence/ioutils.py | 3 +++ cadence/workflowservice.py | 2 +- 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index cd4f16b..b0cdcce 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -79,4 +79,8 @@ def activity_task_loop(worker: Worker): process_end = datetime.datetime.now() logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000) finally: + try: + service.close() + except: + logger.warning("service.close() failed", exc_info=1) worker.notify_thread_stopped() diff --git a/cadence/connection.py b/cadence/connection.py index 1e1a0d3..73b9b45 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -343,6 +343,7 @@ def read_frame(self): def close(self): self.s.close() + self.wrapper.close() def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse: frames = call.build_frames(self.new_id()) diff --git a/cadence/ioutils.py b/cadence/ioutils.py index 977d775..a96eb2f 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -64,3 +64,6 @@ def write_string(self, s: str): def flush(self): self.io_stream.flush() + + def close(self): + self.io_stream.close() diff --git a/cadence/workflowservice.py b/cadence/workflowservice.py index 84a329d..54759d4 100644 --- a/cadence/workflowservice.py +++ b/cadence/workflowservice.py @@ -31,7 +31,7 @@ class WorkflowService: @classmethod - def create(cls, host: str, port: int, timeout: int = 0): + def create(cls, host: str, port: int, timeout: int = None): connection = TChannelConnection.open(host, port, timeout=timeout) return cls(connection) From 0633d15d189b0637f7cd04fa8e68ef177486e4ca Mon Sep 17 00:00:00 2001 From: Matt Anger Date: Mon, 30 Mar 2020 09:11:30 -0700 Subject: [PATCH 38/50] import CancelledError from asyncio --- cadence/decision_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 3e17953..954ca7c 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -9,7 +9,7 @@ import random import logging import threading -from asyncio.base_futures import CancelledError +from asyncio import CancelledError from asyncio.events import AbstractEventLoop from asyncio.futures import Future from asyncio.tasks import Task From 3c8f069bec31baf2e240a3253f9e099c33410c83 Mon Sep 17 00:00:00 2001 From: xuyang Date: Fri, 17 Apr 2020 14:15:17 +0800 Subject: [PATCH 39/50] Use getpass replacing os --- cadence/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cadence/connection.py b/cadence/connection.py index 73b9b45..973e89c 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -1,6 +1,6 @@ from __future__ import annotations -import os +import getpass import socket from dataclasses import dataclass from io import BytesIO @@ -194,7 +194,7 @@ def default_tchannel_headers(): @staticmethod def default_application_headers(): return { - "user-name": os.environ.get("LOGNAME", os.getlogin()), + "user-name": getpass.getuser(), "host-name": socket.gethostname(), # Copied from Java client "cadence-client-library-version": "2.2.0", From 9b990f128219a3d825c3ca19b9a01a91096d645e Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Fri, 1 May 2020 18:59:35 +0800 Subject: [PATCH 40/50] Add support for specifying activity options in new_activity_stub(). --- cadence/activity_method.py | 23 +++++++++++++++++++ cadence/tests/test_activity_method.py | 33 ++++++++++++++++++++++++++- cadence/workflow.py | 6 +++-- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/cadence/activity_method.py b/cadence/activity_method.py index 4357e2c..ba5d813 100644 --- a/cadence/activity_method.py +++ b/cadence/activity_method.py @@ -54,6 +54,8 @@ async def stub_activity_fn(self, *args): assert self._decision_context assert stub_activity_fn._execute_parameters parameters = copy.deepcopy(stub_activity_fn._execute_parameters) + if hasattr(self, "_activity_options") and self._activity_options: + self._activity_options.fill_execute_activity_parameters(parameters) if self._retry_parameters: parameters.retry_parameters = self._retry_parameters parameters.input = args_to_json(args).encode("utf-8") @@ -80,3 +82,24 @@ async def stub_activity_fn(self, *args): raise Exception("activity_method must be called with arguments") else: return wrapper + + +@dataclass +class ActivityOptions: + schedule_to_close_timeout_seconds: int = None + schedule_to_start_timeout_seconds: int = None + start_to_close_timeout_seconds: int = None + heartbeat_timeout_seconds: int = None + task_list: str = None + + def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityParameters): + if self.schedule_to_close_timeout_seconds is not None: + execute_parameters.schedule_to_close_timeout_seconds = self.schedule_to_close_timeout_seconds + if self.schedule_to_start_timeout_seconds is not None: + execute_parameters.schedule_to_start_timeout_seconds = self.schedule_to_start_timeout_seconds + if self.start_to_close_timeout_seconds is not None: + execute_parameters.start_to_close_timeout_seconds = self.start_to_close_timeout_seconds + if self.heartbeat_timeout_seconds is not None: + execute_parameters.heartbeat_timeout_seconds = self.heartbeat_timeout_seconds + if self.task_list is not None: + execute_parameters.task_list = self.task_list diff --git a/cadence/tests/test_activity_method.py b/cadence/tests/test_activity_method.py index df10790..5c3c739 100644 --- a/cadence/tests/test_activity_method.py +++ b/cadence/tests/test_activity_method.py @@ -2,7 +2,7 @@ from unittest import TestCase from unittest.mock import Mock, MagicMock -from cadence.activity_method import activity_method, ExecuteActivityParameters +from cadence.activity_method import activity_method, ExecuteActivityParameters, ActivityOptions from cadence.decision_loop import DecisionContext from cadence.tests.test_decision_context import run_once @@ -134,3 +134,34 @@ async def fn(): self.decision_context.schedule_activity_task.assert_called_once() args, kwargs = self.decision_context.schedule_activity_task.call_args_list[0] self.assertEqual(b'[1, "one"]', kwargs["parameters"].input) + + def test_invoke_with_activity_options(self): + class HelloActivities: + @activity_method(task_list="test-tasklist") + def hello(self, arg1, arg2): + pass + + stub = HelloActivities() + stub._decision_context = self.decision_context + stub._retry_parameters = None + stub._activity_options = ActivityOptions(schedule_to_close_timeout_seconds=50, + schedule_to_start_timeout_seconds=100, + start_to_close_timeout_seconds=150, + heartbeat_timeout_seconds=200, + task_list="tasklist-tasklist-tasklist") + + async def fn(): + await stub.hello(1, "one") + + loop = get_event_loop() + self.task = loop.create_task(fn()) + run_once(loop) + + self.decision_context.schedule_activity_task.assert_called_once() + args, kwargs = self.decision_context.schedule_activity_task.call_args_list[0] + parameters: ExecuteActivityParameters = kwargs["parameters"] + self.assertEquals(parameters.schedule_to_close_timeout_seconds, 50) + self.assertEquals(parameters.schedule_to_start_timeout_seconds, 100) + self.assertEquals(parameters.start_to_close_timeout_seconds, 150) + self.assertEquals(parameters.heartbeat_timeout_seconds, 200) + self.assertEquals(parameters.task_list, "tasklist-tasklist-tasklist") diff --git a/cadence/workflow.py b/cadence/workflow.py index d2f9183..ea3ecc6 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -11,7 +11,7 @@ from six import reraise from cadence.activity import ActivityCompletionClient -from cadence.activity_method import RetryParameters +from cadence.activity_method import RetryParameters, ActivityOptions from cadence.cadence_types import WorkflowIdReusePolicy, StartWorkflowExecutionRequest, TaskList, WorkflowType, \ GetWorkflowExecutionHistoryRequest, WorkflowExecution, HistoryEventFilterType, EventType, HistoryEvent, \ StartWorkflowExecutionResponse, SignalWorkflowExecutionRequest, QueryWorkflowRequest, WorkflowQuery, \ @@ -28,13 +28,14 @@ class Workflow: @staticmethod - def new_activity_stub(activities_cls, retry_parameters: RetryParameters = None): + def new_activity_stub(activities_cls, retry_parameters: RetryParameters = None, activity_options: ActivityOptions = None): from cadence.decision_loop import ITask task: ITask = ITask.current() assert task cls = activities_cls() cls._decision_context = task.decider.decision_context cls._retry_parameters = retry_parameters + cls._activity_options = activity_options return cls @staticmethod @@ -428,3 +429,4 @@ class WorkflowExecutionTerminatedException(Exception): def __str__(self) -> str: return self.reason + From 79f89900f367b71ea42bb171b5bb2bf5546abb59 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Fri, 1 May 2020 20:44:47 +0800 Subject: [PATCH 41/50] Released beta2. --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2d2dd2c..99c6226 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ docker run --network=host --rm ubercadence/cli:master --do sample domain registe ## Installation cadence-python ``` -pip install cadence-client==1.0.0b1 +pip install cadence-client==1.0.0b2 ``` ## Hello World Sample diff --git a/setup.py b/setup.py index a9f6f79..299424c 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cadence-client", - version="1.0.0-beta1", + version="1.0.0-beta2", author="Mohammed Firdaus", author_email="firdaus.halim@gmail.com", description="Python framework for Cadence Workflow Service", From 8977eb7f05ada70800b5c7e71c7c7d9d9235fd4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Mon, 4 May 2020 11:08:12 -0700 Subject: [PATCH 42/50] Remove unused dep: six --- cadence/workflow.py | 1 - requirements.txt | 1 - setup.py | 1 - 3 files changed, 3 deletions(-) diff --git a/cadence/workflow.py b/cadence/workflow.py index ea3ecc6..98ddb84 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -8,7 +8,6 @@ from typing import Callable, List, Type, Dict, Tuple from uuid import uuid4 -from six import reraise from cadence.activity import ActivityCompletionClient from cadence.activity_method import RetryParameters, ActivityOptions diff --git a/requirements.txt b/requirements.txt index 9d6fbd7..5b30432 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ more-itertools==7.0.0 ply==3.11 -six==1.12.0 tblib==1.6.0 thriftrw==1.7.2 dataclasses-json==0.3.8 diff --git a/setup.py b/setup.py index 299424c..febac75 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,6 @@ "dataclasses-json>=0.3.8", "more-itertools>=7.0.0", "ply>=3.11", - "six>=1.12.0", "tblib>=1.6.0", "thriftrw>=1.7.2", ], From 769574a235890361ecac86c02b8e954ac953f7e6 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Thu, 7 May 2020 01:42:54 +0800 Subject: [PATCH 43/50] Handle DecisionTaskFailed. --- cadence/decision_loop.py | 8 +++++++- cadence/tests/test_decision_loop.py | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 954ca7c..48d7731 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -27,7 +27,7 @@ HistoryEvent, EventType, WorkflowType, ScheduleActivityTaskDecisionAttributes, \ CancelWorkflowExecutionDecisionAttributes, StartTimerDecisionAttributes, TimerFiredEventAttributes, \ FailWorkflowExecutionDecisionAttributes, RecordMarkerDecisionAttributes, Header, WorkflowQuery, \ - RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse + RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse, DecisionTaskFailedCause from cadence.conversions import json_to_args, args_to_json from cadence.decisions import DecisionId, DecisionTarget from cadence.exception_handling import serialize_exception, deserialize_exception @@ -674,6 +674,11 @@ def handle_activity_task_failed(self, event: HistoryEvent): def handle_activity_task_timed_out(self, event: HistoryEvent): self.decision_context.handle_activity_task_timed_out(event) + def handle_decision_task_failed(self, event: HistoryEvent): + attr = event.decision_task_failed_event_attributes + if attr and attr.cause == DecisionTaskFailedCause.RESET_WORKFLOW: + self.decision_context.set_current_run_id(attr.new_run_id) + def handle_workflow_execution_signaled(self, event: HistoryEvent): signaled_event_attributes = event.workflow_execution_signaled_event_attributes signal_input = signaled_event_attributes.input @@ -816,6 +821,7 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent): EventType.DecisionTaskScheduled: noop, EventType.DecisionTaskStarted: noop, # Filtered by HistoryHelper EventType.DecisionTaskTimedOut: noop, # TODO: check + EventType.DecisionTaskFailed: ReplayDecider.handle_decision_task_failed, EventType.ActivityTaskScheduled: ReplayDecider.handle_activity_task_scheduled, EventType.ActivityTaskStarted: ReplayDecider.handle_activity_task_started, EventType.ActivityTaskCompleted: ReplayDecider.handle_activity_task_completed, diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 2d4ba10..255225c 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -6,7 +6,8 @@ from cadence.cadence_types import HistoryEvent, EventType, PollForDecisionTaskResponse, \ ScheduleActivityTaskDecisionAttributes, WorkflowExecutionStartedEventAttributes, Decision, \ - ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes + ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes, DecisionTaskFailedEventAttributes, \ + DecisionTaskFailedCause from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import HistoryHelper, is_decision_event, DecisionTaskLoop, ReplayDecider, DecisionEvents, \ nano_to_milli @@ -365,6 +366,18 @@ def test_handle_activity_task_started(self): args, kwargs = state_machine.handle_started_event.call_args_list[0] self.assertIn(event, args) + def test_handle_decision_task_failed(self): + event = HistoryEvent(event_id=15) + event.event_type = EventType.DecisionTaskFailed + event.decision_task_failed_event_attributes = DecisionTaskFailedEventAttributes() + event.decision_task_failed_event_attributes.cause = DecisionTaskFailedCause.RESET_WORKFLOW + event.decision_task_failed_event_attributes.new_run_id = "the-new-run-id" + self.decider.decision_context = decision_context = MagicMock() + self.decider.handle_decision_task_failed(event) + decision_context.set_current_run_id.assert_called() + args, kwargs = decision_context.set_current_run_id.call_args_list[0] + assert args[0] == "the-new-run-id" + def tearDown(self) -> None: self.decider.destroy() From f5c9fa1f80eefce3932b33b66e6eade38b8cc2ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Fri, 8 May 2020 11:02:06 -0700 Subject: [PATCH 44/50] Add min. Py3 version required --- README.md | 4 ++++ setup.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/README.md b/README.md index 99c6226..bece74b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +

+PyPI - Python Version +

+ # Intro: Fault-Oblivious Stateful Python Code cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. diff --git a/setup.py b/setup.py index 299424c..a4e51f3 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,8 @@ ], classifiers=[ "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], From 4f70eb427eb18ae320b84517a6b6766a39d69be6 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Tue, 26 May 2020 01:52:47 +0800 Subject: [PATCH 45/50] Update README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 99c6226..e4e6e6a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ +> I'm doing a survey to figure out the way forward for this library - https://airtable.com/shrJWfd63yBWSJKoI +> Your feedback is very much appreciated! + # Intro: Fault-Oblivious Stateful Python Code cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. From 1a29c72d1d984f769dab59ab55395e0ad29313dc Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Tue, 26 May 2020 01:58:32 +0800 Subject: [PATCH 46/50] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e4e6e6a..fc954a5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -> I'm doing a survey to figure out the way forward for this library - https://airtable.com/shrJWfd63yBWSJKoI +> I'm doing a survey to figure out the way forward for this library - https://airtable.com/shrJWfd63yBWSJKoI
> Your feedback is very much appreciated! # Intro: Fault-Oblivious Stateful Python Code From 7a36c4924e430359419162d469bed312b4e14a40 Mon Sep 17 00:00:00 2001 From: Syrus Akbary Date: Fri, 5 Jun 2020 16:39:13 -0700 Subject: [PATCH 47/50] Improved README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e656486..5a7a65a 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ pip install cadence-client==1.0.0b2 ## Hello World Sample -``` +```python import sys import logging from cadence.activity_method import activity_method @@ -70,7 +70,7 @@ class GreetingActivities: # Activities Implementation class GreetingActivitiesImpl: def compose_greeting(self, greeting: str, name: str): - return greeting + " " + name + "!" + return f"{greeting} {name}!" # Workflow Interface From d58d5d7fc9eea2703c450ab583bec2eb8dcf5fea Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Wed, 8 Jul 2020 23:25:44 +0800 Subject: [PATCH 48/50] Released b3. --- README.md | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5a7a65a..b3d14e8 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ docker run --network=host --rm ubercadence/cli:master --do sample domain registe ## Installation cadence-python ``` -pip install cadence-client==1.0.0b2 +pip install cadence-client==1.0.0b3 ``` ## Hello World Sample diff --git a/setup.py b/setup.py index 31db30d..ea3d576 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cadence-client", - version="1.0.0-beta2", + version="1.0.0-beta3", author="Mohammed Firdaus", author_email="firdaus.halim@gmail.com", description="Python framework for Cadence Workflow Service", From 05671bb0959f8370a2f80e67090f1d741a00a379 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Sat, 1 Aug 2020 16:47:35 +0800 Subject: [PATCH 49/50] Update README.md --- README.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/README.md b/README.md index b3d14e8..7c8541b 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,3 @@ -> I'm doing a survey to figure out the way forward for this library - https://airtable.com/shrJWfd63yBWSJKoI
-> Your feedback is very much appreciated! - -

-PyPI - Python Version -

- # Intro: Fault-Oblivious Stateful Python Code cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off. From c429263c30fbd9da96f6f8ef92dd678d7b0074d5 Mon Sep 17 00:00:00 2001 From: Mohammed Firdaus Date: Tue, 1 Sep 2020 16:05:46 +0800 Subject: [PATCH 50/50] Add Workflow.get_workflow_id() and Workflow.get_execution_id(). --- cadence/decision_loop.py | 7 +++++-- cadence/workflow.py | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 48d7731..afdf0d5 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -537,6 +537,7 @@ class ReplayDecider: decision_events: DecisionEvents = None decisions: OrderedDict[DecisionId, DecisionStateMachine] = field(default_factory=OrderedDict) decision_context: DecisionContext = None + workflow_id: str = None activity_id_to_scheduled_event_id: Dict[str, int] = field(default_factory=dict) @@ -910,14 +911,16 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: def process_task(self, decision_task: PollForDecisionTaskResponse) -> List[Decision]: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decisions: List[Decision] = decider.decide(decision_task.history.events) decider.destroy() return decisions def process_query(self, decision_task: PollForDecisionTaskResponse) -> bytes: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decider.decide(decision_task.history.events) try: result = decider.query(decision_task, decision_task.query) diff --git a/cadence/workflow.py b/cadence/workflow.py index 98ddb84..be570a8 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -89,6 +89,19 @@ def get_logger(name): task: ITask = ITask.current() return task.decider.decision_context.get_logger(name) + @staticmethod + def get_workflow_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.workflow_id + + @staticmethod + def get_execution_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.execution_id + + class WorkflowStub: pass