Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask-dapr >= 0.2.0a8
durabletask-dapr >= 0.2.0a9
# needed for .env file loading in examples
python-dotenv>=1.0.0
# needed for enhanced schema generation from function features
Expand Down
141 changes: 133 additions & 8 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pip3 install -r requirements.txt
Each of the examples in this directory can be run directly from the command line.

### Simple Workflow
This example represents a workflow that manages counters through a series of activities and child workflows.
This example represents a workflow that manages counters through a series of activities and child workflows.
It shows several Dapr Workflow features including:
- Basic activity execution with counter increments
- Retryable activities with configurable retry policies
Expand Down Expand Up @@ -57,7 +57,7 @@ timeout_seconds: 30
-->

```sh
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py
dapr run --app-id wf-simple-example -- python3 simple.py
```
<!--END_STEP-->

Expand Down Expand Up @@ -99,7 +99,7 @@ timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
dapr run --app-id wfexample -- python3 task_chaining.py
```
<!--END_STEP-->

Expand Down Expand Up @@ -146,7 +146,7 @@ timeout_seconds: 30
-->

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
dapr run --app-id wfexample -- python3 fan_out_fan_in.py
```
<!--END_STEP-->

Expand Down Expand Up @@ -186,7 +186,7 @@ This example demonstrates how to use a workflow to interact with a human user. T
The Dapr CLI can be started using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001
dapr run --app-id wfexample
```

In a separate terminal window, run the following command to start the Python workflow app:
Expand Down Expand Up @@ -222,7 +222,7 @@ This example demonstrates how to eternally running workflow that polls an endpoi
The Dapr CLI can be started using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001
dapr run --app-id wfexample
```

In a separate terminal window, run the following command to start the Python workflow app:
Expand Down Expand Up @@ -254,7 +254,7 @@ This workflow runs forever or until you press `ENTER` to stop it. Starting the a
This example demonstrates how to call a child workflow. The Dapr CLI can be started using the following command:

```sh
dapr run --app-id wfexample --dapr-grpc-port 50001
dapr run --app-id wfexample
```

In a separate terminal window, run the following command to start the Python workflow app:
Expand All @@ -269,4 +269,129 @@ When you run the example, you will see output like this:
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
...
```
```


### Cross-app Workflow

This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:

<!-- STEP
name: Run apps
expected_stdout_lines:
- '== APP == app1 - triggering app1 workflow'
- '== APP == app1 - received workflow call'
- '== APP == app1 - triggering app2 workflow'
- '== APP == app2 - received workflow call'
- '== APP == app2 - triggering app3 activity'
- '== APP == app3 - received activity call'
- '== APP == app3 - returning activity result'
- '== APP == app2 - received activity result'
- '== APP == app2 - returning workflow result'
- '== APP == app1 - received workflow result'
- '== APP == app1 - returning workflow result'
background: true
sleep: 20
-->

```sh
dapr run --app-id wfexample3 python3 cross-app3.py &
dapr run --app-id wfexample2 python3 cross-app2.py &
dapr run --app-id wfexample1 python3 cross-app1.py
```
<!-- END_STEP -->

When you run the apps, you will see output like this:
```
...
app1 - triggering app2 workflow
app2 - triggering app3 activity
...
```
among others. This shows that the workflow calls are working as expected.


#### Error handling on activity calls

This example demonstrates how the error handling works on activity calls across apps.

Error handling on activity calls across apps works as normal workflow activity calls.

In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted.

<!-- STEP
name: Run apps
expected_stdout_lines:
- '== APP == app1 - triggering app1 workflow'
- '== APP == app1 - received workflow call'
- '== APP == app1 - triggering app2 workflow'
- '== APP == app2 - received workflow call'
- '== APP == app2 - triggering app3 activity'
- '== APP == app3 - received activity call'
- '== APP == app3 - raising error in activity due to error mode being enabled'
- '== APP == app2 - received activity error from app3'
- '== APP == app2 - returning workflow result'
- '== APP == app1 - received workflow result'
- '== APP == app1 - returning workflow result'
sleep: 20
-->

```sh
export ERROR_ACTIVITY_MODE=true
dapr run --app-id wfexample3 python3 cross-app3.py &
dapr run --app-id wfexample2 python3 cross-app2.py &
dapr run --app-id wfexample1 python3 cross-app1.py
```
<!-- END_STEP -->


When you run the apps with the `ERROR_ACTIVITY_MODE` environment variable set, you will see output like this:
```
...
app3 - received activity call
app3 - raising error in activity due to error mode being enabled
app2 - received activity error from app3
...
```
among others. This shows that the activity calls are failing as expected, and they are being handled as expected too.


#### Error handling on workflow calls

This example demonstrates how the error handling works on workflow calls across apps.

Error handling on workflow calls across apps works as normal workflow calls.

In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted.

<!-- STEP
name: Run apps
expected_stdout_lines:
- '== APP == app1 - triggering app1 workflow'
- '== APP == app1 - received workflow call'
- '== APP == app1 - triggering app2 workflow'
- '== APP == app2 - received workflow call'
- '== APP == app2 - raising error in workflow due to error mode being enabled'
- '== APP == app1 - received workflow error from app2'
- '== APP == app1 - returning workflow result'
sleep: 20
-->

```sh
export ERROR_WORKFLOW_MODE=true
dapr run --app-id wfexample3 python3 cross-app3.py &
dapr run --app-id wfexample2 python3 cross-app2.py &
dapr run --app-id wfexample1 python3 cross-app1.py
```
<!-- END_STEP -->

When you run the apps with the `ERROR_WORKFLOW_MODE` environment variable set, you will see output like this:
```
...
app2 - received workflow call
app2 - raising error in workflow due to error mode being enabled
app1 - received workflow error from app2
...
```
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.

58 changes: 58 additions & 0 deletions examples/workflow/cross-app1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import timedelta

from durabletask.task import TaskFailedError
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.workflow
def app1_workflow(ctx: wf.DaprWorkflowContext):
print(f'app1 - received workflow call', flush=True)
print(f'app1 - triggering app2 workflow', flush=True)

try:
retry_policy = wf.RetryPolicy(
max_number_of_attempts=2,
first_retry_interval=timedelta(milliseconds=100),
max_retry_interval=timedelta(seconds=3),
)
yield ctx.call_child_workflow(
workflow='app2_workflow',
input=None,
app_id='wfexample2',
retry_policy=retry_policy,
)
print(f'app1 - received workflow result', flush=True)
except TaskFailedError as e:
print(f'app1 - received workflow error from app2', flush=True)

print(f'app1 - returning workflow result', flush=True)
return 1


if __name__ == '__main__':
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
print(f'app1 - triggering app1 workflow', flush=True)
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)

# Wait for the workflow to complete
time.sleep(7)

wfr.shutdown()
50 changes: 50 additions & 0 deletions examples/workflow/cross-app2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import timedelta
import os

from durabletask.task import TaskFailedError
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.workflow
def app2_workflow(ctx: wf.DaprWorkflowContext):
print(f'app2 - received workflow call', flush=True)
if os.getenv('ERROR_WORKFLOW_MODE', 'false') == 'true':
print(f'app2 - raising error in workflow due to error mode being enabled', flush=True)
raise ValueError('Error in workflow due to error mode being enabled')
print(f'app2 - triggering app3 activity', flush=True)
try:
retry_policy = wf.RetryPolicy(
max_number_of_attempts=2,
first_retry_interval=timedelta(milliseconds=100),
max_retry_interval=timedelta(seconds=3),
)
result = yield ctx.call_activity(
'app3_activity', input=None, app_id='wfexample3', retry_policy=retry_policy
)
print(f'app2 - received activity result', flush=True)
except TaskFailedError as e:
print(f'app2 - received activity error from app3', flush=True)

print(f'app2 - returning workflow result', flush=True)
return 2


if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
wfr.shutdown()
32 changes: 32 additions & 0 deletions examples/workflow/cross-app3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.activity
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
print(f'app3 - received activity call', flush=True)
if os.getenv('ERROR_ACTIVITY_MODE', 'false') == 'true':
print(f'app3 - raising error in activity due to error mode being enabled', flush=True)
raise ValueError('Error in activity due to error mode being enabled')
print(f'app3 - returning activity result', flush=True)
return 3


if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
wfr.shutdown()
Loading