Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .flake8

This file was deleted.

13 changes: 6 additions & 7 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest pytest-cov pytest-asyncio
pip install -r requirements.txt
- name: Lint with flake8
pip install .[dev]
- name: Lint with ruff
run: |
flake8 . --count --show-source --statistics --exit-zero
ruff check
- name: Pytest unit tests
run: |
pytest -m "not e2e" --verbose
tox -e py${{ matrix.python-version }}
# Sidecar for running e2e tests requires Go SDK
- name: Install Go SDK
uses: actions/setup-go@v5
Expand All @@ -46,7 +45,7 @@ jobs:
run: |
go install github.com/dapr/durabletask-go@main
durabletask-go --port 4001 &
pytest -m "e2e" --verbose
tox -e py${{ matrix.python-version }}-e2e
publish:
needs: build
if: startswith(github.ref, 'refs/tags/v')
Expand All @@ -70,4 +69,4 @@ jobs:
TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }}
run: |
python -m build
twine upload dist/*
twine upload dist/*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ gen-proto:
curl -o durabletask/internal/orchestrator_service.proto https://raw.githubusercontent.com/dapr/durabletask-protobuf/refs/heads/main/protos/orchestrator_service.proto
curl -H "Accept: application/vnd.github.v3+json" "https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=main&per_page=1" | jq -r '.[0].sha' > durabletask/internal/PROTO_SOURCE_COMMIT_HASH
# NOTE: remember to check/update pyproject.toml protobuf version to follow https://github.com/grpc/grpc/blob/v{{VERSION GRPC IO TOOL BELLOW}}/tools/distrib/python/grpcio_tools/setup.py
pip install grpcio-tools==1.74.0
pip install .[dev]
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

Expand Down
28 changes: 20 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,33 +162,45 @@ The following is more information about how to develop this project. Note that d
### Generating protobufs

```sh
pip3 install -r dev-requirements.txt
make gen-proto
```

This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`.

### Running unit tests

Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running.
Unit tests can be run using the following command from the project root.
Unit tests _don't_ require a sidecar process to be running.

To run on a specific python version (eg: 3.11), run the following command from the project root:

```sh
make test-unit
tox -e py311
```

### Running E2E tests

The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command:
The E2E (end-to-end) tests require a sidecar process to be running.

For non-multi app activities test you can use the Durable Task test sidecar using the following command:

```sh
go install github.com/dapr/durabletask-go@main
durabletask-go --port 4001
```

To run the E2E tests, run the following command from the project root:
Certain aspects like multi-app activities require the full dapr runtime to be running.

```shell
dapr init || true

dapr run --app-id test-app --dapr-grpc-port 4001 --components-path ./examples/components/
```

To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:

```sh
make test-e2e
tox -e py311 -- e2e
```

## Contributing
Expand All @@ -207,8 +219,8 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio

## Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
trademarks or logos is subject to and must follow
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
trademarks or logos is subject to and must follow
[Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general).
Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship.
Any use of third-party trademarks or logos are subject to those third-party's policies.
1 change: 0 additions & 1 deletion durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@

"""Durable Task SDK for Python"""


PACKAGE_NAME = "durabletask"
110 changes: 66 additions & 44 deletions durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,29 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
from durabletask.aio.internal.shared import get_grpc_aio_channel, ClientInterceptor
from durabletask import task
from durabletask.client import OrchestrationState, OrchestrationStatus, new_orchestration_state, TInput, TOutput
from durabletask.aio.internal.grpc_interceptor import DefaultClientInterceptorImpl
from durabletask.aio.internal.shared import ClientInterceptor, get_grpc_aio_channel
from durabletask.client import (
OrchestrationState,
OrchestrationStatus,
TInput,
TOutput,
new_orchestration_state,
)


class AsyncTaskHubGrpcClient:

def __init__(self, *,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False,
interceptors: Optional[Sequence[ClientInterceptor]] = None):

def __init__(
self,
*,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False,
interceptors: Optional[Sequence[ClientInterceptor]] = None,
):
if interceptors is not None:
interceptors = list(interceptors)
if metadata is not None:
Expand All @@ -39,9 +46,7 @@ def __init__(self, *,
interceptors = None

channel = get_grpc_aio_channel(
host_address=host_address,
secure_channel=secure_channel,
interceptors=interceptors
host_address=host_address, secure_channel=secure_channel, interceptors=interceptors
)
self._channel = channel
self._stub = stubs.TaskHubSidecarServiceStub(channel)
Expand All @@ -57,18 +62,23 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
return False

async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:

async def schedule_new_orchestration(
self,
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
*,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
) -> str:
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

req = pb.CreateInstanceRequest(
name=name,
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
input=wrappers_pb2.StringValue(value=shared.to_json(input))
if input is not None
else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(None),
orchestrationIdReusePolicy=reuse_id_policy,
Expand All @@ -78,20 +88,25 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
return res.instanceId

async def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
async def get_orchestration_state(
self, instance_id: str, *, fetch_payloads: bool = True
) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
res: pb.GetInstanceResponse = await self._stub.GetInstance(req)
return new_orchestration_state(req.instanceId, res)

async def wait_for_orchestration_start(self, instance_id: str, *,
fetch_payloads: bool = False,
timeout: int = 0) -> Optional[OrchestrationState]:
async def wait_for_orchestration_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start.")
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
)
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
req, timeout=grpc_timeout
)
return new_orchestration_state(req.instanceId, res)
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
Expand All @@ -100,22 +115,30 @@ async def wait_for_orchestration_start(self, instance_id: str, *,
else:
raise

async def wait_for_orchestration_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
timeout: int = 0) -> Optional[OrchestrationState]:
async def wait_for_orchestration_completion(
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete.")
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(req, timeout=grpc_timeout)
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
state = new_orchestration_state(req.instanceId, res)
if not state:
return None

if state.runtime_status == OrchestrationStatus.FAILED and state.failure_details is not None:
if (
state.runtime_status == OrchestrationStatus.FAILED
and state.failure_details is not None
):
details = state.failure_details
self._logger.info(f"Instance '{instance_id}' failed: [{details.error_type}] {details.message}")
self._logger.info(
f"Instance '{instance_id}' failed: [{details.error_type}] {details.message}"
)
elif state.runtime_status == OrchestrationStatus.TERMINATED:
self._logger.info(f"Instance '{instance_id}' was terminated.")
elif state.runtime_status == OrchestrationStatus.COMPLETED:
Expand All @@ -130,26 +153,25 @@ async def wait_for_orchestration_completion(self, instance_id: str, *,
raise

async def raise_orchestration_event(
self,
instance_id: str,
event_name: str,
*,
data: Optional[Any] = None):
self, instance_id: str, event_name: str, *, data: Optional[Any] = None
):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
input=wrappers_pb2.StringValue(value=shared.to_json(data)) if data else None)
input=wrappers_pb2.StringValue(value=shared.to_json(data)) if data else None,
)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
await self._stub.RaiseEvent(req)

async def terminate_orchestration(self, instance_id: str, *,
output: Optional[Any] = None,
recursive: bool = True):
async def terminate_orchestration(
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
):
req = pb.TerminateRequest(
instanceId=instance_id,
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None,
recursive=recursive)
recursive=recursive,
)

self._logger.info(f"Terminating instance '{instance_id}'.")
await self._stub.TerminateInstance(req)
Expand Down
24 changes: 16 additions & 8 deletions durabletask/aio/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,30 @@


class _ClientCallDetails(
namedtuple(
'_ClientCallDetails',
['method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression']),
grpc_aio.ClientCallDetails):
namedtuple(
"_ClientCallDetails",
["method", "timeout", "metadata", "credentials", "wait_for_ready", "compression"],
),
grpc_aio.ClientCallDetails,
):
pass


class DefaultClientInterceptorImpl(
grpc_aio.UnaryUnaryClientInterceptor, grpc_aio.UnaryStreamClientInterceptor,
grpc_aio.StreamUnaryClientInterceptor, grpc_aio.StreamStreamClientInterceptor):
grpc_aio.UnaryUnaryClientInterceptor,
grpc_aio.UnaryStreamClientInterceptor,
grpc_aio.StreamUnaryClientInterceptor,
grpc_aio.StreamStreamClientInterceptor,
):
"""Async gRPC client interceptor to add metadata to all calls."""

def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

def _intercept_call(self, client_call_details: _ClientCallDetails) -> grpc_aio.ClientCallDetails:
def _intercept_call(
self, client_call_details: _ClientCallDetails
) -> grpc_aio.ClientCallDetails:
if self._metadata is None:
return client_call_details

Expand All @@ -39,7 +46,8 @@ def _intercept_call(self, client_call_details: _ClientCallDetails) -> grpc_aio.C
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression)
client_call_details.compression,
)

async def intercept_unary_unary(self, continuation, client_call_details, request):
new_client_call_details = self._intercept_call(client_call_details)
Expand Down
Loading
Loading