Skip to content

Conversation

cicoyle
Copy link
Contributor

@cicoyle cicoyle commented Mar 25, 2025

This PR introduces event streaming capabilities to Dapr. Enables Dapr to stream system events to configured endpoints. The initial implementation focuses on Workflow and Actor events, with support for additional APIs planned for future releases.

Key changes:

  • Added a new OnEventAlpha1 endpoint for sinks to receive event streams
  • Added configuration support for event sinks in both Kubernetes (CRD) and standalone modes
    • Supports local and remote event collection
    • Configurable event filtering
    • mTLS security for remote collectors
  • Added events.proto with the core event streaming service and message definitions
  • Added workflows.proto leveraging event definitions from dapr/durabletask-protobuf
    • Note: Import paths for durabletask-protobuf may need adjustment
  • Added actors.proto with actor lifecycle and interaction events - can & should be expanded on

Issue reference

  1. Closes #8565 (event streaming)
  • follow up issues should be opened after the initial implementation is in dapr v1.16
  1. Closes #6072 ([workflow] introduce workflow lifecycle events firehose)

Additional information

The workflow events leverage existing definitions from dapr/durabletask-protobuf. The import paths in workflows.proto may need adjustment to correctly resolve these dependencies.

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
@cicoyle cicoyle self-assigned this Mar 25, 2025
Copy link

@WhitWaldo WhitWaldo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only change I would suggest making is that instead of creating another format out of new cloth, we repurpose existing extendable specs we already take advantage of across the rest of Dapr (e.g. CloudEvents).

Also, per my proposal, I'm in favor of swapping out CEL with wholly embracing the CloudEvent filtering spec (for consistency). But however that pans out, I would suggest that we embrace that here as well instead of making yet another filtering mechanism.

This way, the work we'll do already to support and optimize the broad filtering for Actor PubSub will be trivially extended here as well and developer investment learning one approach is applicable here as well.

#### Delivery Model
Transactional delivery was considered, but given the additional complexity and blocking nature of implementing acknowledgements and retries, best-effort delivery was chosen. If there is a strong need for guaranteed delivery, this can be added later with retry logic and acknowledgements, but would introduce latency and potential blocking in the event processing pipeline.

#### Event Structure

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've already embraced the CNCF CloudEvents spec for PubSub, why not extend use of it here? It would future-proof this proposal down the road if PubSub is added (as those messages would need to be wrapped anyway) and each of these event types would be distinctly identified by a dedicated type prefix value with optional extensions per type as necessitated.

But then we can trivially lean on CE's filtering logic (so as to avoid having three types of event-based filters across Dapr:

  • CEL expressions for existing PubSub
  • CloudEvent expressions for PubSub actors (proposed, though currently assuming CEL expressions)
  • Streaming filters for system events proposed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While CloudEvents is used in our PubSub API, protocol buffers makes more sense here bc: we're exclusively using gRPC transport, protocol buffers offers better performance and native gRPC integration, while maintaining consistency with most of Dapr's APIs which use protobufs.

Re: filtering - you raise a good point about having different filtering mechanisms. We could align our filtering syntax with CloudEvents spec while keeping the protobuf transport, giving us both performance benefits and filtering consistency across Dapr. Would this address your concern about differing filter approaches?

      filters: # Overrides global filters
        include:
          - type: "dapr.io.workflows"
            specversion: "1.0"

Which could then be expanded to dapr.io.workflows.failed in the future.

Copy link

@oising oising Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use cloudevents with protobuf encoding? It's a formal standard. Building a bridge to pubsub would be trivial then.

- etc
- Future expansion to:
- Filter by specific event types within an API domain, example of how this could be done:
- `workflows.failed.dapr.io/v1` // Only workflow failure events

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circling back to the above comment, this would instead be represented as in CloudEvent form as:

{
  "specversion": "1.0",
  "type": "io.dapr.workflows.failed",
  "source": "http://localhost:<appIdPort>/workflows/dapr/<workflowName>"
  "subject": "<workflowInstanceId>",
  "id": "75aa1a29-dfb5-49e7-848f-6b8aac130f00",
  "time": "2025-03-25T17:31:00Z",
  "datacontenttype": "text/plain",
  "data": "<exception message>"
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For inspiration, this is the schema of the Azure Durable Functions lifecycle event

  • id: Unique identifier for the Event Grid event.
  • subject: Path to the event subject. durable/orchestrator/{orchestrationRuntimeStatus}. {orchestrationRuntimeStatus} will be Running, Completed, Failed, and Terminated.
  • data: Durable Functions Specific Parameters.
    • hubName: TaskHub name.
    • functionName: Orchestrator function name.
    • instanceId: Durable Functions instanceId.
    • reason: Additional data associated with the tracking event. For more information, see Diagnostics in Durable Functions (Azure Functions)
    • runtimeStatus: Orchestration Runtime Status. Running, Completed, Failed, Canceled.
  • eventType: "orchestratorEvent"
  • eventTime: Event time (UTC).
  • dataVersion: Version of the lifecycle event schema.
  • metadataVersion: Version of the metadata.
  • topic: Event Grid topic resource.

- The current design allows for easy addition of new event types while maintaining type safety
- Delivery options could be added to support both `best-effort` and `transactional` delivery by adding a `delivery` field, configurable per sink:
```yaml
spec:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming CloudEvent subscription spec this would be modified to put the delivery and security attributes under a "config" key and apply filters on a per-sink basis.

events:
  subscriptions:
  - id: abc123
    sink: "audit-collector:8080"
    protocol: "http"
    config:
    -  delivery: "transactional" # or "best-effort" (default)
       security:
       - mtls: true
         identity: "spiffe://cluster.local/ns/cassie-app/sa/collector"
    filters: |
      exact:
      - type: io.dapr.workfows.failed

@olitomlinson
Copy link

I generally support this Proposal.

However, I believe PubSub transport should be prioritised ahead of gRPC direct transport.

Given the volume of events that are coming from the various Event Sources, its highly likely that an end-user is going to want to persist these to a queue for async / offline processing. Dapr PubSub would be perfect for this and follows a similar pattern to how Outbox uses internal PubSub Topics. I would prioritise guaranteed delivery, over low latency.

I do not want to temporally couple an external collector service (that I've had to write myself!) to receive these events. This is the whole point of putting queues between services so that they can be decoupled, and the value can still be realised even if the collector service is temporarily offline, which is entirely the expectation in a distributed system.

FWIW Azure Durable Functions publishes its events directly to EventGrid, which offers many options for sync and async sinking to a target system. Publishing to Dapr PubSub would be the equivalent (to EventGrid) in this Dapr context.

@WhitWaldo
Copy link

I'd also prefer to see this implemented to pubsub first and an external endpoint in a later release as it would also these events to be more readily consumed by interested Dapr resources (especially monitoring actors per my actor pubsub proposal).

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
@cicoyle
Copy link
Contributor Author

cicoyle commented Mar 31, 2025

I generally support this Proposal.

However, I believe PubSub transport should be prioritised ahead of gRPC direct transport.

Given the volume of events that are coming from the various Event Sources, its highly likely that an end-user is going to want to persist these to a queue for async / offline processing. Dapr PubSub would be perfect for this and follows a similar pattern to how Outbox uses internal PubSub Topics. I would prioritise guaranteed delivery, over low latency.

I do not want to temporally couple an external collector service (that I've had to write myself!) to receive these events. This is the whole point of putting queues between services so that they can be decoupled, and the value can still be realised even if the collector service is temporarily offline, which is entirely the expectation in a distributed system.

FWIW Azure Durable Functions publishes its events directly to EventGrid, which offers many options for sync and async sinking to a target system. Publishing to Dapr PubSub would be the equivalent (to EventGrid) in this Dapr context.

Thanks for the thoughtful feedback here :)

You make excellent points about the benefits of PubSub regarding decoupling and guaranteed delivery. While the PubSub approach is a great solution, it comes with additional costs and infrastructure pre-requisites. That is the reasoning behind the gRPC first implementation, offering simpler initial implementation paths, lower barrier to entry without additional infrastructure, supporting real-time monitoring with minimal latency, and additionally, no additional infrastructure costs.

PubSub support is definitely planned as a future enhancement (see the future considerations section). With this design, we are building a low barrier to entry solution that can be expanded upon to support PubSub in the future, giving users the flexibility to choose between direct streaming and queued delivery based on their needs.

It's also worth noting that users can achieve PubSub functionality by having their collector application publish received events to a Dapr PubSub topic. This gives them the flexibility to implement queuing and persistence patterns while we work on native PubSub support.

@olitomlinson
Copy link

olitomlinson commented Apr 6, 2025

Thanks @cicoyle, a few remarks :

While the PubSub approach is a great solution, it comes with additional costs and infrastructure pre-requisites

These are costs that most people are comfortable with when it comes to guaranteed processing.

With this design, we are building a low barrier to entry solution that can be expanded upon to support PubSub in the future, giving users the flexibility to choose between direct streaming and queued delivery based on their needs.

Working with GRPC streams is not something that a majority of developers are doing. Most are still using HTTP. Therefore taking this approach initially alienates users, and actually raises the barrier to entry. Personally speaking, I don't do anything with GRPC day to day, so this would be a huge learning curve for me and not one that I would want to take just to receive some events.


Don't get me wrong, I fully appreciate that having a GRPC implementation is useful (particularly for enterprise dapr vendors who would benefit from a grpc event stream straight into their control plane, without any PubSub overhead) but I implore the maintainers to prioritise the needs of actual end-users first and deliver a Dapr PubSub integration before all else.

@olitomlinson
Copy link

At Kubecon EU, I received lots of feedback from users who were interested in seeing aggregates of how many workflows were in a certain state, at the current moment in time. This is naturally a hard problem to solve given the decentralised nature of Dapr Workflows and Dapr Actors.

However, I'm pretty confident that this proposal will form the the basis of that data generation, such that a centralised component could be deployed and retain aggregates for consumption by users or other value-add services (Maybe consider this a new optional analytics dapr control plane service)

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
@oising
Copy link

oising commented Jun 13, 2025

@olitomlinson
Copy link

Given the increasing outcry for more management and operational tooling for Workflows, we must prioritise this for 1.17, as a vehicle for delivering dapr/dapr#8837

@cicoyle @yaron2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Event Streaming [workflow] introduce workflow lifecycle events firehose
5 participants