Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c4e9ef7
Workflow Authoring
DeepanshuA Apr 25, 2023
487e97f
Add example
DeepanshuA Apr 25, 2023
0733570
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 25, 2023
26380ee
lint
DeepanshuA Apr 25, 2023
eb995b8
is it wheel fix
DeepanshuA Apr 25, 2023
cd044d1
fix lint
DeepanshuA Apr 25, 2023
128a1fa
Add tests and client APIs
DeepanshuA Apr 27, 2023
9361b30
lint
DeepanshuA Apr 27, 2023
f82bb8e
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA Apr 28, 2023
ffde1df
Add dtf python dependency
DeepanshuA Apr 28, 2023
8d62488
correction
DeepanshuA Apr 28, 2023
1be2447
Remove get-pip.py
DeepanshuA May 2, 2023
a537862
Update durabletask dependency version
DeepanshuA May 2, 2023
75ae2bf
Extra line - to be deleted
DeepanshuA May 3, 2023
7a31c66
test compatible with 3.7
DeepanshuA May 3, 2023
325dedb
Merge branch 'master' into workflow_authoring
yaron2 May 4, 2023
b8bca56
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 9, 2023
2f81812
Incorporate review comments
DeepanshuA May 10, 2023
3185586
Merge branch 'workflow_authoring' of https://github.com/DeepanshuA/py…
DeepanshuA May 10, 2023
2ff152e
lint
DeepanshuA May 11, 2023
347c0bf
Ut fix
DeepanshuA May 11, 2023
2ebf20c
validate demo_workflow
DeepanshuA May 11, 2023
a8fa501
App Readme
DeepanshuA May 11, 2023
05194c8
fix step md
DeepanshuA May 11, 2023
7598dd1
Validate demo workflow example
DeepanshuA May 12, 2023
74dbc9f
Remove demo actor temporarily
DeepanshuA May 12, 2023
bf69dea
Include raise event test and assertions
DeepanshuA May 16, 2023
0968b8e
Rename
DeepanshuA May 16, 2023
4886c9e
Incorporate Review comments
DeepanshuA May 18, 2023
18bc883
Lint, validate
DeepanshuA May 18, 2023
9d8ec7f
test correction
DeepanshuA May 18, 2023
49f960a
Fake class method correction
DeepanshuA May 18, 2023
c64a091
Check expected std output in validate example
DeepanshuA May 18, 2023
f3207fc
Remove extra port check
DeepanshuA May 19, 2023
2bfc44a
Merge branch 'master' of github.com:dapr/python-sdk into workflow_aut…
DeepanshuA May 19, 2023
82b33e5
Temporary - Verify Workflow Example first
DeepanshuA May 19, 2023
de33a9b
Requirements
DeepanshuA May 19, 2023
363a0df
Remove line
DeepanshuA May 19, 2023
ba2087a
Add back removed validate examples
DeepanshuA May 19, 2023
a3fb750
Update examples/demo_workflow/demo_workflow/requirements.txt
berndverst May 22, 2023
135d3d7
Change running order of wf
DeepanshuA May 22, 2023
a9be483
Commit to re-run example
DeepanshuA May 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions examples/demo_workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Example - Dapr Workflow Authoring

This document describes how to register a workflow and activities inside it and start running it.

## Pre-requisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.7+](https://www.python.org/downloads/)

### Install requirements

You can install dapr SDK package using pip command:

<!-- STEP
name: Install requirements
-->

```sh
pip3 install -r demo_workflow/requirements.txt
```

<!-- END_STEP -->

<!-- STEP
name: Running this example
expected_stdout_lines:
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
background: true
timeout_seconds: 30
sleep: 15
-->

```sh
dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 4001 --components-path components --placement-host-address localhost:50005 -- python3 app.py
```

<!-- END_STEP -->

You should be able to see the following output:
```
== APP == New counter value is: 1!
== APP == New counter value is: 11!
== APP == New counter value is: 111!
== APP == New counter value is: 1111!
```
63 changes: 63 additions & 0 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
# Copyright 2023 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from time import sleep
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext, WorkflowActivityContext
from dapr.conf import Settings

settings = Settings()

counter = 0

def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)

def hello_act(ctx: WorkflowActivityContext, input):
global counter
counter += input
print(f'New counter value is: {counter}!', flush=True)

def main():
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()

host = settings.DAPR_RUNTIME_HOST
if host is None:
host = "localhost"
port = settings.DAPR_GRPC_PORT
if port is None:
port = "4001"

workflow_client = DaprWorkflowClient(host, port)
print("==========Start Counter Increase as per Input:==========")
_id = workflow_client.schedule_new_workflow(hello_world_wf, input='Hi Counter!')
# Sleep for a while to let the workflow run
sleep(1)
assert counter == 11
sleep(10)
workflow_client.raise_workflow_event(_id, "event1")
# Sleep for a while to let the workflow run
sleep(1)
assert counter == 1111
status = workflow_client.wait_for_workflow_completion(_id, timeout_in_seconds=6000)
assert status.runtime_status.name == "COMPLETED"
workflowRuntime.shutdown()

if __name__ == '__main__':
main()
15 changes: 15 additions & 0 deletions examples/demo_workflow/components/state_redis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore-actors
spec:
type: state.redis
version: v1
initTimeout: 1m
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
1 change: 1 addition & 0 deletions examples/demo_workflow/demo_workflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dapr-ext-workflow-dev>=0.0.1rc1.dev
1 change: 1 addition & 0 deletions ext/dapr-ext-workflow/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dapr-ext-workflow extension

This is the workflow authoring extension for Dapr Workflow


Installation
------------

Expand Down
12 changes: 8 additions & 4 deletions ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
"""

# Import your main classes here
# from dapr.ext.workflow.creator import Creator, Workflow # type:ignore

from dapr.ext.workflow.workflow_runtime import WorkflowRuntime
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext

__all__ = [
# 'Creator',
# 'Workflow',
'WorkflowRuntime',
'DaprWorkflowClient',
'DaprWorkflowContext',
'WorkflowActivityContext',
]
210 changes: 210 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from __future__ import annotations
from datetime import datetime
from typing import Any, TypeVar, Union
from dapr.conf import settings

from durabletask import client
from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow

T = TypeVar('T')
TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')


class DaprWorkflowClient:
"""Defines client operations for managing Dapr Workflow instances.

This is an alternative to the general purpose Dapr client. It uses a gRPC connection to send
commands directly to the workflow engine, bypassing the Dapr API layer.

This client is intended to be used by workflow application, not by general purpose
application.
"""
def __init__(self, host: Union[str, None] = None, port: Union[str, None] = None):
if host is None:
host = settings.DAPR_RUNTIME_HOST
if not host or len(host) == 0 or len(host.strip()) == 0:
host = "localhost"
port = port or settings.DAPR_GRPC_PORT
address = f"{host}:{port}"
self.__obj = client.TaskHubGrpcClient(host_address=address)

def schedule_new_workflow(self,
workflow: Workflow, *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None) -> str:
"""Schedules a new workflow instance for execution.

Args:
workflow: The workflow to schedule.
input: The optional input to pass to the scheduled workflow instance. This must be a
serializable value.
instance_id: The unique ID of the workflow instance to schedule. If not specified, a
new GUID value is used.
start_at: The time when the workflow instance should start executing.
If not specified or if a date-time in the past is specified, the workflow instance will
be scheduled immediately.

Returns:
The ID of the scheduled workflow instance.
"""
return self.__obj.schedule_new_orchestration(workflow.__name__,
input=input, instance_id=instance_id,
start_at=start_at)

def get_workflow_state(self, instance_id: str, *,
fetch_payloads: bool = True) -> Union[WorkflowState, None]:
"""Fetches runtime state for the specified workflow instance.

Args:
instanceId: The unique ID of the workflow instance to fetch.
fetch_payloads: If true, fetches the input, output payloads and custom status
for the workflow instance. Defaults to false.

Returns:
The current state of the workflow instance, or None if the workflow instance does not
exist.

"""
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None

def wait_for_workflow_start(self, instance_id: str, *,
fetch_payloads: bool = False,
timeout_in_seconds: int = 60) -> Union[WorkflowState, None]:
"""Waits for a workflow to start running and returns a WorkflowState object that contains
metadata about the started workflow.

A "started" workflow instance is any instance not in the WorkflowRuntimeStatus.Pending
state. This method will return a completed task if the workflow has already started
running or has already completed.

Args:
instance_id: The unique ID of the workflow instance to wait for.
fetch_payloads: If true, fetches the input, output payloads and custom status for
the workflow instance. Defaults to false.
timeout_in_seconds: The maximum time to wait for the workflow instance to start running.
Defaults to 60 seconds.

Returns:
WorkflowState record that describes the workflow instance and its execution status.
If the specified workflow isn't found, the WorkflowState.Exists value will be false.
"""
state = self.__obj.wait_for_orchestration_start(instance_id,
fetch_payloads=fetch_payloads,
timeout=timeout_in_seconds)
return WorkflowState(state) if state else None

def wait_for_workflow_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
timeout_in_seconds: int = 60) -> Union[WorkflowState, None]:
"""Waits for a workflow to complete and returns a WorkflowState object that contains
metadata about the started instance.

A "completed" workflow instance is any instance in one of the terminal states. For
example, the WorkflowRuntimeStatus.Completed, WorkflowRuntimeStatus.Failed or
WorkflowRuntimeStatus.Terminated states.

Workflows are long-running and could take hours, days, or months before completing.
Workflows can also be eternal, in which case they'll never complete unless terminated.
In such cases, this call may block indefinitely, so care must be taken to ensure
appropriate timeouts are enforced using timeout parameter.

If a workflow instance is already complete when this method is called, the method
will return immediately.

Args:
instance_id: The unique ID of the workflow instance to wait for.
fetch_payloads: If true, fetches the input, output payloads and custom status
for the workflow instance. Defaults to true.
timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to
complete. Defaults to 60 seconds.

Returns:
WorkflowState record that describes the workflow instance and its execution status.
"""
state = self.__obj.wait_for_orchestration_completion(instance_id,
fetch_payloads=fetch_payloads,
timeout=timeout_in_seconds)
return WorkflowState(state) if state else None

def raise_workflow_event(self, instance_id: str, event_name: str, *,
data: Union[Any, None] = None):
"""Sends an event notification message to a waiting workflow instance.
In order to handle the event, the target workflow instance must be waiting for an
event named value of "eventName" param using the wait_for_external_event API.
If the target workflow instance is not yet waiting for an event named param "eventName"
value, then the event will be saved in the workflow instance state and dispatched
immediately when the workflow calls wait_for_external_event.
This event saving occurs even if the workflow has canceled its wait operation before
the event was received.

Workflows can wait for the same event name multiple times, so sending multiple events
with the same name is allowed. Each external event received by a workflow will complete
just one task returned by the wait_for_external_event method.

Raised events for a completed or non-existent workflow instance will be silently
discarded.

Args:
instanceId: The ID of the workflow instance that will handle the event.
eventName: The name of the event. Event names are case-insensitive.
data: The serializable data payload to include with the event.
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)

def terminate_workflow(self, instance_id: str, *,
output: Union[Any, None] = None):
"""Terminates a running workflow instance and updates its runtime status to
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
the task hub. When the task hub worker processes this message, it will update the runtime
status of the target instance to WorkflowRuntimeStatus.Terminated. You can use
wait_for_workflow_completion to wait for the instance to reach the terminated state.

Terminating a workflow instance has no effect on any in-flight activity function
executions or child workflows that were started by the terminated instance. Those
actions will continue to run without interruption. However, their results will be
discarded. If you want to terminate child-workflows, you must issue separate terminate
commands for each child workflow instance individually.

At the time of writing, there is no way to terminate an in-flight activity execution.

Args:
instance_id: The ID of the workflow instance to terminate.
output: The optional output to set for the terminated workflow instance.
"""
return self.__obj.terminate_orchestration(instance_id, output=output)

def pause_workflow(self, instance_id: str):
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
resume the workflow.

Args:
instance_id: The instance ID of the workflow to suspend.
"""
return self.__obj.suspend_orchestration(instance_id)

def resume_workflow(self, instance_id: str):
"""Resumes a workflow instance that was suspended via pause_workflow.

Args:
instance_id: The instance ID of the workflow to resume.
"""
return self.__obj.resume_orchestration(instance_id)
Loading