Skip to content

Commit

Permalink
Merge pull request #106 from conductor-sdk/feature/worker-as-python-c…
Browse files Browse the repository at this point in the history
…allable

[feature] Implement workers as functions
  • Loading branch information
gardusig committed Jul 25, 2022
2 parents 2fc089b + 678c793 commit 88b7b8a
Show file tree
Hide file tree
Showing 31 changed files with 352 additions and 247 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Tests

on: pull_request

jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.x"
- name: Instal package from local files
run: python3 -m pip install .
env:
CONDUCTOR_PYTHON_VERSION: v1.0.0
- name: Run Unit Tests
run: python3 -m unittest discover --verbose --start-directory=./tests/unit
- name: Run Integration Tests
run: python3 ./tests/main.py
env:
PYTHON_INTEGRATION_TESTS_SERVER_API_URL: https://pg-staging.orkesconductor.com/api
PYTHON_INTEGRATION_TESTS_SERVER_KEY_ID: 88659b04-acab-47a8-a876-2b19619a2587
PYTHON_INTEGRATION_TESTS_SERVER_KEY_SECRET: 6LSlOPLSh8bQFxR26kjGt6j6OM6zGynlHuQ6N5qzLenmH39i
33 changes: 26 additions & 7 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,47 @@ on:
types:
- released
- prereleased
push:
tags:
- "*"

jobs:
build-and-publish:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up release version
run: |
export VERSION=${{github.ref_name}}
export REL_VER=`echo ${VERSION:1}`
echo "Release version is $REL_VER"
echo "RELEASE_VERSION=$REL_VER" >> $GITHUB_ENV
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.x"
- name: Install build dependencies
run: python3 -m pip install -U setuptools wheel build
- name: Build
run: CONDUCTOR_PYTHON_VERSION=$RELEASE_VERSION python3 -m build
run: python3 -m build
env:
CONDUCTOR_PYTHON_VERSION: ${{ github.ref_name }}
- name: Install publish dependencies
run: python3 -m pip install -U twine
- name: Publish at pypi
run: python3 -m twine upload dist/* -u ${{ secrets.pypi_user }} -p ${{ secrets.pypi_pass }}
validate-release:
needs: build-and-publish
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.x"
- name: Install package from remote files
run: python3 -m pip install conductor-python==$CONDUCTOR_PYTHON_VERSION
env:
CONDUCTOR_PYTHON_VERSION: ${{ github.ref_name }}
- name: Run Integration Tests
run: python3 ./tests/main.py
env:
PYTHON_INTEGRATION_TESTS_SERVER_API_URL: https://pg-staging.orkesconductor.com/api
PYTHON_INTEGRATION_TESTS_SERVER_KEY_ID: 88659b04-acab-47a8-a876-2b19619a2587
PYTHON_INTEGRATION_TESTS_SERVER_KEY_SECRET: 6LSlOPLSh8bQFxR26kjGt6j6OM6zGynlHuQ6N5qzLenmH39i
103 changes: 68 additions & 35 deletions docs/worker/README.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,92 @@
## Write a simple worker
# Worker

Considering real use cases, the goal is to run multiple workers in parallel. Due to some limitations with Python, a multiprocessing architecture was chosen in order to enable real parallelization.

You should basically write your workers and append them to a list. The `TaskHandler` class will spawn a unique and independent process for each worker, making sure it will behave as expected, by running an infinite loop like this:
* Poll for a `Task` at Conductor Server
* Generate `TaskResult` from given `Task`
* Update given `Task` with `TaskResult` at Conductor Server

## Write workers

Currently, there are two ways of writing a Python worker:
1. [Worker as a function](#worker-as-a-function)
2. [Worker as a class](#worker-as-a-class)


### Worker as a function

The function must receive a `Task` as input and produce `TaskResult` as output, example:

```python
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface

def execute(task: Task) -> TaskResult:
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id='your_custom_id'
)
task_result.add_output_data('worker_style', 'function')
task_result.status = TaskResultStatus.COMPLETED
return task_result
```

### Worker as a class

The class must implement `WorkerInterface` class, which requires `execute` method. The remaining ones are inherited, but can be easily overridden. Example:

```python
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface

class SimplePythonWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('key1', 'value')
task_result.add_output_data('key2', 42)
task_result.add_output_data('key3', False)
task_result.add_output_data('worker_style', 'class')
task_result.add_output_data('secret_number', 1234)
task_result.add_output_data('is_it_true', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result

def get_polling_interval_in_seconds(self) -> float:
return 1

# poll every 500ms
return 0.5
```

def main():
# Point to the Conductor Server
configuration = Configuration(
server_api_url='https://play.orkes.io/api',
debug=True,
authentication_settings=AuthenticationSettings( # Optional if you are using a server that requires authentication
key_id='KEY',
key_secret='SECRET'
)
)
## Run Workers

# Add three workers
workers = [
SimplePythonWorker('python_task_example'),
]
Now you can use your workers by calling a `TaskHandler`, example:

# Start the worker processes and wait for their completion
with TaskHandler(workers, configuration) as task_handler:
task_handler.start_processes()
task_handler.join_processes()
```python
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.worker.worker import Worker

workers = [
SimplePythonWorker(
task_definition_name='python_task_example'
),
Worker(
task_definition_name='python_task_example',
execute_function=execute,
poll_interval=0.25,
)
]

if __name__ == '__main__':
main()
with TaskHandler(workers, configuration) as task_handler:
task_handler.start_processes()
task_handler.join_processes()
```

Start polling for the work

```shell
python main.py
python3 main.py
```

See [Using Conductor Playground](https://orkes.io/content/docs/getting-started/playground/using-conductor-playground)
for more details on how to use Playground environment for testing.
See [Using Conductor Playground](https://orkes.io/content/docs/getting-started/playground/using-conductor-playground) for more details on how to use Playground environment for testing.

Check out more examples, like this general usage: [main.py](../../src/example/main/main.py)


## C/C++ Support
Expand Down
17 changes: 0 additions & 17 deletions src/conductor/client/__main__.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/conductor/client/http/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def __deserialize(self, data, klass):
:return: object.
"""
if data is None:
if data is b'' or data is None:
return None

if type(klass) == str:
Expand Down
18 changes: 0 additions & 18 deletions src/conductor/client/worker/task_worker.py

This file was deleted.

48 changes: 48 additions & 0 deletions src/conductor/client/worker/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.worker.worker_interface import WorkerInterface
from typing import Any, Callable
from typing_extensions import Self
import inspect

ExecuteTaskFunction = Callable[[Any], Any]


def is_callable_input_parameter_a_task(execute_function: ExecuteTaskFunction) -> bool:
parameters = inspect.signature(execute_function).parameters
if len(parameters) != 1:
return False
parameter = parameters[list(parameters.keys())[0]]
return parameter.annotation == Task


def is_callable_return_value_a_task_result(execute_function: ExecuteTaskFunction) -> bool:
return_annotation = inspect.signature(execute_function).return_annotation
return return_annotation == TaskResult


class Worker(WorkerInterface):
def __init__(self, task_definition_name: str, execute_function: ExecuteTaskFunction, poll_interval: float) -> Self:
super().__init__(task_definition_name)
self.poll_interval = poll_interval
self.execute_function = execute_function

def execute(self, task: Task) -> TaskResult:
return self.execute_function(task)

def get_polling_interval_in_seconds(self) -> float:
return self.poll_interval

@property
def execute_function(self) -> ExecuteTaskFunction:
return self._execute_function

@execute_function.setter
def execute_function(self, execute_function: ExecuteTaskFunction) -> None:
self._execute_function = execute_function
self._is_execute_function_input_parameter_a_task = is_callable_input_parameter_a_task(
execute_function
)
self._is_execute_function_return_value_a_task_result = is_callable_return_value_a_task_result(
execute_function
)
Loading

0 comments on commit 88b7b8a

Please sign in to comment.