Skip to content

Conversation

@filintod
Copy link
Contributor

@filintod filintod commented Aug 20, 2025

Description

Make it possible to use python workflow that manages code using asyncio to retrofit code that is already written as async/await.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #[issue number]

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

@filintod filintod changed the title initial asyncio implemenation after durable task asyncio work initial asyncio implementation after durable task asyncio work Aug 20, 2025
return getattr(self, name)

@staticmethod
def _coerce_env_value(default_value, env_variable: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a unit test for this pls

Comment on lines 37 to 53
# gRPC keepalive (disabled by default; enable via env to help with idle debugging sessions)
DAPR_GRPC_KEEPALIVE_ENABLED: bool = False
DAPR_GRPC_KEEPALIVE_TIME_MS: int = 120000 # send keepalive pings every 120s
DAPR_GRPC_KEEPALIVE_TIMEOUT_MS: int = (
20000 # wait 20s for ack before considering the connection dead
)
DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS: bool = False # allow pings when there are no active calls

# gRPC retries (disabled by default; enable via env to apply channel service config)
DAPR_GRPC_RETRY_ENABLED: bool = False
DAPR_GRPC_RETRY_MAX_ATTEMPTS: int = 4
DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS: int = 100
DAPR_GRPC_RETRY_MAX_BACKOFF_MS: int = 1000
DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER: float = 2.0
# Comma-separated list of status codes, e.g., 'UNAVAILABLE,DEADLINE_EXCEEDED'
DAPR_GRPC_RETRY_CODES: str = 'UNAVAILABLE,DEADLINE_EXCEEDED'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these consts or env vars for debugging..? or are the configurable somehow from an end user POV?

Comment on lines 228 to 229
'initialBackoff': f'{int(settings.DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS) / 1000.0}s',
'maxBackoff': f'{int(settings.DAPR_GRPC_RETRY_MAX_BACKOFF_MS) / 1000.0}s',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we take ms and then convert to s? why not just use s then? can you put a comment with the unit here pls?

Comment on lines 197 to 199
# ------------------------------
# gRPC channel options helpers
# ------------------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests for these pls

Comment on lines 6 to 7
pip>=23.0.0
coverage>=5.3
pytest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a uv lock file too or not in this repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not here, not uv native yet

Purpose
-------
- Provide pass-throughs for engine fields (``trace_parent``, ``trace_state``,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is trace_state for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove (as I removed form durabletask PR), that is part of the tracing information from proto

Copy link
Contributor

@sicoyle sicoyle Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so trace_state is in the protos, as in the dapr wf protos in duraabletask protos repo? did you see it used in the go sdk? or its a field we can add to python sdk and thats why you added it initially? I guess im not even sure what trace_state is used for tbh so yeah just trying to gather more context from ya :)

Comment on lines +150 to +160
retry_policy: RetryPolicy | None
Optional retry policy for the child-workflow. When provided, failures will be retried
according to the policy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this defined within call_activity? how is this applied to only child workflows? still reading so sorry if just missing this so far

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child workflows (sub-orchestrators) also allow retry policy

Comment on lines +1 to +2
dapr-ext-workflow-dev>=1.15.0.dev
dapr-dev>=1.15.0.dev
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the latest release version the .dev?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder tho if we should not pin and let it default to latest bc thats what we do in the dapr agents requirement files for the quickstarts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is if we don't have anything people have an older version somehow and the requirements do not tell them to upgrade and it fails.

These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
async workflow APIs. Activities remain regular functions unless noted.

How to run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add a venv reference in here too

- Ensure a Dapr sidecar is running locally. If needed, set `DURABLETASK_GRPC_ENDPOINT`, or
`DURABLETASK_GRPC_HOST/PORT`.
- Install requirements: `pip install -r requirements.txt`
- Run any example: `python simple.py`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't these need to run with a dapr run cmd then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed but it surely will make i more predicatble that daprd is running, let me think on this if the change is small

Comment on lines +4 to +12
Example: Interceptors for context propagation (client + runtime).
This example shows how to:
- Define a small context (dict) carried via contextvars
- Implement ClientInterceptor to inject that context into outbound inputs
- Implement RuntimeInterceptor to restore the context before user code runs
- Wire interceptors into WorkflowRuntime and DaprWorkflowClient
Note: Scheduling/running requires a Dapr sidecar. This file focuses on the wiring pattern.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a blurb here on why an end user would want to do this interceptor pls? Like for propagating trace or X, Y, Z. We also need to update docs somewhere to share that this is a new feature. Is there a doc in the python sdk somewhere for this or do we just add in dapr docs somehwere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in python docs, but the README should have references to it.

Comment on lines +149 to +151
# instance_id = cli.schedule_new_workflow(workflow_example, input={'x': 1})
# print('scheduled:', instance_id)
# rt.start(); rt.wait_for_ready(); ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you uncomment these so users can run and see the context injected. Do we print out the stuff that gets injected so this example is crystal clear to the value add of the interceptor from an end user POV?

Comment on lines +21 to +22
Example of implementing provider-specific model/tool serialization OUTSIDE the core package.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Example of implementing provider-specific model/tool serialization OUTSIDE the core package.

"""
Example of implementing provider-specific model/tool serialization OUTSIDE the core package.
This demonstrates how to build and use your own contracts using the generic helpers from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this from the openai stuff wrapping agents within a workflow? I'm not sure this is really an example in the same way that the other python files are example workflows right? Why would someone use this? Again, I think it's from openai agents being wrapped within a workflow but want to confirm pls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, this should not be here, or could be split for serialization only PR

@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more context in a description at the top of this file as to why someone would want to use these so ppl have context when looking at all of these classes the value add of the interceptor and other use cases beyond tracing they could use them for pls?


runtime = WorkflowRuntime(
runtime_interceptors=[TracingRuntimeInterceptor(_on_span)],
workflow_outbound_interceptors=[TracingWorkflowOutboundInterceptor(_get_trace)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there also an inbound interceptor somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interceptors are optional, but yeah there are inbound ones, just not in this example

Comment on lines +16 to +17
ei = ctx.execution_info
# Return attempt (may be None if engine doesn't set it)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add context on why this is useful at the top of the file in a comment pls? Also, maybe rename the file from e2e_execinfo.py to just execution_info.py instead? this is metadata from the runtime based on retry count of the global workflow plus instance id plus whatelse? Is attempt based just on entire workflow runs/reruns or is this incremented based on activity retries too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might remove this as this was added at one point but then at the end not used much

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i guess lets rm to help trim things down here pls 🙏

Comment on lines 1 to 11
# dapr-ext-workflow-dev>=1.15.0.dev
# dapr-dev>=1.15.0.dev

# local development: install local packages in editable mode

# if using dev version of durabletask-python
-e ../../../durabletask-python

# if using dev version of dapr-ext-workflow
-e ../../ext/dapr-ext-workflow
-e ../..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# dapr-ext-workflow-dev>=1.15.0.dev
# dapr-dev>=1.15.0.dev
# local development: install local packages in editable mode
# if using dev version of durabletask-python
-e ../../../durabletask-python
# if using dev version of dapr-ext-workflow
-e ../../ext/dapr-ext-workflow
-e ../..
dapr-ext-workflow-dev>=1.15.0.dev
dapr-dev>=1.15.0.dev
# local development: install local packages in editable mode
# if using dev version of durabletask-python
# -e ../../../durabletask-python
# if using dev version of dapr-ext-workflow
# -e ../../ext/dapr-ext-workflow
# -e ../..

@@ -0,0 +1,116 @@
## Dapr Workflow Middleware: Outbound Hooks for Context Propagation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it called middleware here but interceptors elsewhere in the examples and such? Should it be named one over the other? Or is this something different?

## Dapr Workflow Middleware: Outbound Hooks for Context Propagation

Goal
- Add outbound hooks to Dapr Workflow middleware so adapters can inject tracing/context when scheduling activities/child workflows/signals without wrapping user code.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does middleware = adapter = interceptor?

Comment on lines 8 to 9
- Current `RuntimeMiddleware` exposes only inbound lifecycle hooks (workflow/activity start/complete/error). There is no hook before scheduling activities to mutate inputs/headers.
- We currently wrap `ctx.call_activity` to inject tracing. This is effective but adapter-specific and leaks into application flow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so RuntimeMiddleware already exists?

Comment on lines 11 to 13
Proposed API
- Extend `dapr.ext.workflow` with additional hook points (illustrative names):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this proposed api implemented?

Comment on lines 16 to 24
# existing inbound hooks...
def on_workflow_start(self, ctx: Any, input: Any): ...
def on_workflow_complete(self, ctx: Any, result: Any): ...
def on_workflow_error(self, ctx: Any, error: BaseException): ...
def on_activity_start(self, ctx: Any, input: Any): ...
def on_activity_complete(self, ctx: Any, result: Any): ...
def on_activity_error(self, ctx: Any, error: BaseException): ...

# new outbound hooks (workflow outbound)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls define what inbound and outbound mean in this context?

```

Behavior
- Hooks run within workflow sandbox; must be deterministic and side-effect free.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls remind me what the sandbox is for and update this doc with that info too pls?

- Hooks run within workflow sandbox; must be deterministic and side-effect free.
- The engine uses the middleware’s return value as the actual input for the scheduling call.
- If multiple middlewares are installed, chain them in order (each sees the previous result).
- If a hook raises, log and continue with the last good value (non-fatal by default).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this what temporal chose for their behavior too and that's why you went with this?

Comment on lines 63 to 75
Adapter usage (example)
```python
class TraceContextMiddleware(RuntimeMiddleware):
def on_schedule_activity(self, ctx, activity, input, retry_policy):
from agents_sdk.adapters.openai.tracing import serialize_trace_context
tracing = serialize_trace_context()
if input is None:
return {"tracing": tracing}
if isinstance(input, dict) and "tracing" not in input:
return {**input, "tracing": tracing}
return input

# inbound restore already implemented via on_activity_start/complete/error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this specific to your open ai tracing work? Can you remind me where that is again pls?

Comment on lines 102 to 105
- Once the SDK exposes outbound hooks:
- Remove `wrap_ctx_inject_tracing` and `activity_restore_wrapper` from the adapter wiring.
- Keep inbound restoration in middleware only (already implemented).
- Simplify `AgentWorkflowRuntime` so it doesn’t need context wrappers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have these methods been removed? does AgentWorkflowRuntime still need cleanup?


This package supports authoring workflows with ``async def`` in addition to the existing generator-based orchestrators.

- Register async workflows using ``WorkflowRuntime.workflow`` (auto-detects coroutine) or ``async_workflow`` / ``register_async_workflow``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add when users should use async_workflow vs regular workflow pls

This package supports authoring workflows with ``async def`` in addition to the existing generator-based orchestrators.

- Register async workflows using ``WorkflowRuntime.workflow`` (auto-detects coroutine) or ``async_workflow`` / ``register_async_workflow``.
- Use ``AsyncWorkflowContext`` for deterministic operations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is AsyncWorkflowContext a class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Interceptors provide a simple, composable way to apply cross-cutting behavior with a single
enter/exit per call. There are three types:

- Client interceptors: wrap outbound scheduling from the client (schedule_new_workflow).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understand these a bit better when referencing the example code. Can you update these a bit to be clearer pls? You have a client interceptor but have outbound in its description but then also have outbound interceptors. Are there inbound interceptors? if not, then why not?

Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
@filintod filintod marked this pull request as ready for review November 20, 2025 06:36
@filintod filintod requested review from a team as code owners November 20, 2025 06:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants