Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
6d5a17f
Unit test for DecisionEvents.__post_init (markers).
firdaus Mar 21, 2020
97654af
Unit test for MarkerHandler set_data() and marker_replayed().
firdaus Mar 21, 2020
5ce94c2
Unit test: markers should be processed first in ReplayDecider.process…
firdaus Mar 21, 2020
3220585
Update README.md
firdaus Mar 21, 2020
7d9147c
Unit test: ClockDecisionContext.handle_marker_recorded() (version_han…
firdaus Mar 21, 2020
3c32567
Handle WorkflowExecutionCompleted.
firdaus Mar 21, 2020
2a30c6a
Unit test: query workflow after it is completed.
firdaus Mar 21, 2020
88e0c39
Merge branch 'master' of github.com:firdaus/cadence-python
firdaus Mar 21, 2020
dbac651
Update to include additional hidden field in stub instance "_retry_pa…
firdaus Mar 21, 2020
bede56e
Use a custom exception type because deserialize_exception has issues …
firdaus Mar 21, 2020
cbe9e9d
Dummy event object should be of type HistoryEvent.
firdaus Mar 21, 2020
495d07b
Fix unit test for test_first_decision_next_decision_id.
firdaus Mar 21, 2020
10aabfb
Update test_activity_task_failed - ActivityTaskFailedException is no …
firdaus Mar 21, 2020
0a4d329
Mock process_event() so that it is not explicitly executed here.
firdaus Mar 21, 2020
26dfa44
cadence-cli will pass an empty byte array when a workflow is invoked …
firdaus Mar 22, 2020
02e4662
Fix setup.py by including rest of required dependencies.
Mar 22, 2020
502cfe9
Return a string representation in order to not break unit tests.
firdaus Mar 22, 2020
1ddc94b
Remove test_query_workflow_timeout() - seems like the backend behavio…
firdaus Mar 22, 2020
c433baf
Merge pull request #13 from jakubbrzeski/fix_setup_py
firdaus Mar 22, 2020
c29310e
Without the type annotation, `replayed` isn't recognized in the const…
firdaus Mar 22, 2020
8065a00
Behavior changed, unit test was outdated. For no decisions to be crea…
firdaus Mar 22, 2020
8a8dd1d
Unit test: A decision will be created during first replay (replayed=F…
firdaus Mar 22, 2020
5d81eb1
Unit test is outdated after fixing the versioning issue: test_handle_…
firdaus Mar 22, 2020
b98d8f9
Fix: test_clock_decision_context_get_version_stored - with new workfl…
firdaus Mar 22, 2020
9f488ee
An exception is thrown in this case because we are replaying when -1 …
firdaus Mar 22, 2020
ab2a50d
Merge branch 'master' of github.com:firdaus/cadence-python
firdaus Mar 22, 2020
b3b8278
Disable unit test for Java interoperability.
firdaus Mar 23, 2020
19973ba
Update README.md
firdaus Mar 25, 2020
e80016a
Add thrift-parser.js to make the code complete.
firdaus Mar 27, 2020
1bbda58
Merge branch 'master' of github.com:firdaus/cadence-python
firdaus Mar 27, 2020
1ebfbf0
Released beta1.
firdaus Mar 27, 2020
9a64a1a
Update README.md
firdaus Mar 28, 2020
78a0fff
Update README.md
firdaus Mar 28, 2020
d2a648a
Update README.md
firdaus Mar 28, 2020
594d0d8
Update README.md
firdaus Mar 28, 2020
eca0d00
Update README.md
firdaus Mar 28, 2020
d6bbc9a
Update README.md
firdaus Mar 28, 2020
df222de
Add timeout for socket connections.
firdaus Mar 29, 2020
363a320
Improve the performance of worker.stop() by checking the is_stop_requ…
firdaus Mar 29, 2020
962d404
Update sample code.
firdaus Mar 29, 2020
f51c87f
Merge branch 'master' of github.com:firdaus/cadence-python
firdaus Mar 29, 2020
e312a8f
Make sure to close socket file descriptor.
firdaus Mar 29, 2020
0633d15
import CancelledError from asyncio
AngerM-DD Mar 30, 2020
bd82b96
Merge pull request #14 from AngerM/manger-fix-python38
firdaus Mar 30, 2020
3c8f069
Use getpass replacing os
Apr 17, 2020
9b990f1
Add support for specifying activity options in new_activity_stub().
firdaus May 1, 2020
79f8990
Released beta2.
firdaus May 1, 2020
662b36a
Merge pull request #15 from jiaxuyang/feature/docker-user-name
firdaus May 2, 2020
8977eb7
Remove unused dep: six
sevein May 4, 2020
769574a
Handle DecisionTaskFailed.
firdaus May 6, 2020
8a00373
Merge branch 'master' of github.com:firdaus/cadence-python
firdaus May 6, 2020
f5c9fa1
Add min. Py3 version required
sevein May 8, 2020
4f70eb4
Update README.md
firdaus May 25, 2020
1a29c72
Update README.md
firdaus May 25, 2020
c8c1645
Merge pull request #17 from sevein/dev/remove-six
firdaus May 29, 2020
47a8f7a
Merge branch 'master' into dev/py-min-version
firdaus May 29, 2020
fde68c1
Merge pull request #18 from sevein/dev/py-min-version
firdaus May 29, 2020
7a36c49
Improved README
syrusakbary Jun 5, 2020
b58fdf6
Merge pull request #19 from syrusakbary/patch-1
firdaus Jun 7, 2020
d58d5d7
Released b3.
firdaus Jul 8, 2020
05671bb
Update README.md
firdaus Aug 1, 2020
c429263
Add Workflow.get_workflow_id() and Workflow.get_execution_id().
firdaus Sep 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 86 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,65 +1,46 @@
# Python framework for Cadence Workflow Service
# Intro: Fault-Oblivious Stateful Python Code

[Cadence](https://github.com/uber/cadence) is a workflow engine developed at Uber Engineering. With this framework, workflows and activities managed by Cadence can be implemented in Python code.
cadence-python allows you to create Python functions that have their state (local variables etc..) implicitly saved such that if the process/machine fails the state of the function is not lost and can resume from where it left off.

## Status / TODO
This programming model is useful whenever you need to ensure that a function runs to completion. For example:

cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ March 2020.
- Business logic involving multiple micro services
- CI/CD pipelines
- Data pipelines
- RPA
- ETL
- Marketing automation / Customer journeys / Customer engagement
- Zapier/IFTTT like end user automation.
- Chat bots
- Multi-step forms
- Scheduler/Cron jobs

1.0
- [x] Tchannel implementation
- [x] Python-friendly wrapper around Cadence's Thrift API
- [x] Author activities in Python
- [x] Start workflows (synchronously)
- [x] Create workflows
- [x] Workflow execution in coroutines
- [x] Invoke activities from workflows
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
- [x] Activity retry
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
- [x] Signals
- [x] Queries
- [x] Async workflow execution
- [x] await
- [x] now (currentTimeMillis)
- [x] Sleep
- [x] Loggers
- [x] newRandom
- [x] UUID
- [x] Workflow Versioning
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
Behind the scenes, cadence-python uses [Cadence](https://github.com/uber/cadence) as its backend.

1.1
- [ ] ActivityStub and Workflow.newUntypedActivityStub
- [ ] Classes as arguments and return values to/from activity and workflow methods
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
- [ ] Custom workflow ids through start() and new_workflow_stub()
- [ ] ContinueAsNew
- [ ] Compatibility with Java client
- [ ] Compatibility with Golang client
For more information about the fault-oblivious programming model refer to the Cadence documentation [here](https://cadenceworkflow.io/docs/03_concepts/01_workflows)

2.0
- [ ] Sticky workflows
## Install Cadencce

Post 2.0:
- [ ] sideEffect/mutableSideEffect
- [ ] Parallel activity execution
- [ ] Timers
- [ ] Cancellation Scopes
- [ ] Child Workflows
- [ ] Explicit activity ids for activity invocations
```
wget https://raw.githubusercontent.com/uber/cadence/master/docker/docker-compose.yml
docker-compose up
```

## Installation
## Register `sample` domain

```
pip install cadence-client
docker run --network=host --rm ubercadence/cli:master --do sample domain register -rd 1
```

## Hello World Sample
## Installation cadence-python

```
pip install cadence-client==1.0.0b3
```

## Hello World Sample

```python
import sys
import logging
from cadence.activity_method import activity_method
Expand All @@ -82,7 +63,7 @@ class GreetingActivities:
# Activities Implementation
class GreetingActivitiesImpl:
def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + "!"
return f"{greeting} {name}!"


# Workflow Interface
Expand All @@ -99,6 +80,9 @@ class GreetingWorkflowImpl(GreetingWorkflow):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

async def get_greeting(self, name):
# Place any Python code here that you want to ensure is executed to completion.
# Note: code in workflow functions must be deterministic so that the same code paths
# are ran during replay.
return await self.greeting_activities.compose_greeting("Hello", name)


Expand All @@ -118,4 +102,56 @@ if __name__ == '__main__':
worker.stop()
print("Workers stopped...")
sys.exit(0)
```
```

## Status / TODO

cadence-python is still under going heavy development. It should be considered EXPERIMENTAL at the moment. A production
version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ ~~March 2020~~ April 2020.

1.0
- [x] Tchannel implementation
- [x] Python-friendly wrapper around Cadence's Thrift API
- [x] Author activities in Python
- [x] Start workflows (synchronously)
- [x] Create workflows
- [x] Workflow execution in coroutines
- [x] Invoke activities from workflows
- [x] ActivityCompletionClient heartbeat, complete, complete_exceptionally
- [x] Activity heartbeat, getHeartbeatDetails and doNotCompleteOnReturn
- [x] Activity retry
- [x] Activity getDomain(), getTaskToken(), getWorkflowExecution()
- [x] Signals
- [x] Queries
- [x] Async workflow execution
- [x] await
- [x] now (currentTimeMillis)
- [x] Sleep
- [x] Loggers
- [x] newRandom
- [x] UUID
- [x] Workflow Versioning
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);

1.1
- [ ] ActivityStub and Workflow.newUntypedActivityStub
- [ ] Classes as arguments and return values to/from activity and workflow methods
- [ ] WorkflowStub and WorkflowClient.newUntypedWorkflowStub
- [ ] Custom workflow ids through start() and new_workflow_stub()
- [ ] ContinueAsNew
- [ ] Compatibility with Java client
- [ ] Compatibility with Golang client

2.0
- [ ] Sticky workflows

Post 2.0:
- [ ] sideEffect/mutableSideEffect
- [ ] Local activity
- [ ] Parallel activity execution
- [ ] Timers
- [ ] Cancellation Scopes
- [ ] Child Workflows
- [ ] Explicit activity ids for activity invocations


12 changes: 10 additions & 2 deletions cadence/activity_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse
from cadence.conversions import json_to_args
from cadence.workflowservice import WorkflowService
from cadence.worker import Worker
from cadence.worker import Worker, StopRequestedException

logger = logging.getLogger(__name__)


def activity_task_loop(worker: Worker):
service: WorkflowService = WorkflowService.create(worker.host, worker.port)
service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout())
worker.manage_service(service)
logger.info(f"Activity task worker started: {WorkflowService.get_identity()}")
try:
while True:
if worker.is_stop_requested():
return
try:
service.set_next_timeout_cb(worker.raise_if_stop_requested)

polling_start = datetime.datetime.now()
polling_request = PollForActivityTaskRequest()
polling_request.task_list_metadata = TaskListMetadata()
Expand All @@ -32,6 +34,8 @@ def activity_task_loop(worker: Worker):
task, err = service.poll_for_activity_task(polling_request)
polling_end = datetime.datetime.now()
logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000)
except StopRequestedException:
return
except Exception as ex:
logger.error("PollForActivityTask error: %s", ex)
continue
Expand Down Expand Up @@ -75,4 +79,8 @@ def activity_task_loop(worker: Worker):
process_end = datetime.datetime.now()
logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000)
finally:
try:
service.close()
except:
logger.warning("service.close() failed", exc_info=1)
worker.notify_thread_stopped()
23 changes: 23 additions & 0 deletions cadence/activity_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ async def stub_activity_fn(self, *args):
assert self._decision_context
assert stub_activity_fn._execute_parameters
parameters = copy.deepcopy(stub_activity_fn._execute_parameters)
if hasattr(self, "_activity_options") and self._activity_options:
self._activity_options.fill_execute_activity_parameters(parameters)
if self._retry_parameters:
parameters.retry_parameters = self._retry_parameters
parameters.input = args_to_json(args).encode("utf-8")
Expand All @@ -80,3 +82,24 @@ async def stub_activity_fn(self, *args):
raise Exception("activity_method must be called with arguments")
else:
return wrapper


@dataclass
class ActivityOptions:
schedule_to_close_timeout_seconds: int = None
schedule_to_start_timeout_seconds: int = None
start_to_close_timeout_seconds: int = None
heartbeat_timeout_seconds: int = None
task_list: str = None

def fill_execute_activity_parameters(self, execute_parameters: ExecuteActivityParameters):
if self.schedule_to_close_timeout_seconds is not None:
execute_parameters.schedule_to_close_timeout_seconds = self.schedule_to_close_timeout_seconds
if self.schedule_to_start_timeout_seconds is not None:
execute_parameters.schedule_to_start_timeout_seconds = self.schedule_to_start_timeout_seconds
if self.start_to_close_timeout_seconds is not None:
execute_parameters.start_to_close_timeout_seconds = self.start_to_close_timeout_seconds
if self.heartbeat_timeout_seconds is not None:
execute_parameters.heartbeat_timeout_seconds = self.heartbeat_timeout_seconds
if self.task_list is not None:
execute_parameters.task_list = self.task_list
15 changes: 10 additions & 5 deletions cadence/connection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import os
import getpass
import socket
from dataclasses import dataclass
from io import BytesIO
from typing import IO, List, Union, Optional, Dict
from typing import IO, List, Union, Optional, Dict, Callable

from cadence.frames import InitReqFrame, Frame, Arg, CallReqFrame, CallReqContinueFrame, CallResFrame, \
CallResContinueFrame, FrameWithArgs, CallFlags, ErrorFrame
Expand Down Expand Up @@ -194,7 +194,7 @@ def default_tchannel_headers():
@staticmethod
def default_application_headers():
return {
"user-name": os.environ.get("LOGNAME", os.getlogin()),
"user-name": getpass.getuser(),
"host-name": socket.gethostname(),
# Copied from Java client
"cadence-client-library-version": "2.2.0",
Expand Down Expand Up @@ -299,19 +299,23 @@ class TChannelConnection:
s: socket.socket

@classmethod
def open(cls, host: object, port: object) -> TChannelConnection:
def open(cls, host: object, port: object, timeout: int = None) -> TChannelConnection:
s: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((host, port))
return cls(s)

def __init__(self, s: socket):
self.s = s
self.file = self.s.makefile("rwb")
self.wrapper = IOWrapper(self.file)
self.wrapper = IOWrapper(self.file, socket_=s)
self.current_id = -1

self.handshake()

def set_next_timeout_cb(self, cb: Callable):
self.wrapper.set_next_timeout_cb(cb)

def new_id(self):
self.current_id += 1
return self.current_id
Expand Down Expand Up @@ -339,6 +343,7 @@ def read_frame(self):

def close(self):
self.s.close()
self.wrapper.close()

def call_function(self, call: ThriftFunctionCall) -> ThriftFunctionResponse:
frames = call.build_frames(self.new_id())
Expand Down
4 changes: 4 additions & 0 deletions cadence/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@

CODE_OK = 0x00
CODE_ERROR = 0x01

# This should be at least 60 seconds because Cadence will reply after 60 seconds when polling
# if there is nothing pending
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
Loading