Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming Subscription Support #1324

Open
JoshVanL opened this issue Jul 9, 2024 · 9 comments
Open

Add Streaming Subscription Support #1324

JoshVanL opened this issue Jul 9, 2024 · 9 comments
Assignees
Labels
Milestone

Comments

@JoshVanL
Copy link
Contributor

JoshVanL commented Jul 9, 2024

Dapr v1.14 adds support for streaming subscriptions which allows for dynamically subscribing to pubsub topics over a bi-directional gRPC stream. This API is exposed via the SubscribeTopicEventsAlpha1 gRPC API.

On first call, the client sends an InitialRequest containing the pubsub, topic name, etc. From then on, topic event messages are sent from daprd to the client over the stream. The client reports the processing status of this message back to daprd. A message will not be considered processed until the client has responded with this status. There can be multiple in-flight topic messages. Daprd unsubscribes from the topic when the stream is closed.

Here is a reference implementation for the go-sdk.

@JoshVanL JoshVanL added the kind/enhancement New feature or request label Jul 9, 2024
@philliphoff philliphoff modified the milestones: v1.15, v1.14.x Jul 9, 2024
@WhitWaldo
Copy link
Contributor

/assign

@philliphoff
Copy link
Contributor

philliphoff commented Jul 10, 2024

Some initial thoughts on the shape of a possible implementation:

  • To align with the "component specific clients" approach, creation of a new Dapr.PublishSubscribe (Dapr.Messaging?) package
  • Creation of a new Dapr.DaprPublishSubscribeClient type
  • Client exposes a Subscribe() method that accepts a handler called for each topic event received
    • Subscription ends when cancellation token indicates cancellation requested
    • Subscribe() implementation may call handler concurrently but manages synchronization across gRPC channel
    • Potentially offer option to have buffered/synchronous handling
    • Method completes when either side of the gRPC channel indicates cancellation requested
    • Method throws if either side indicates an error
namespace Dapr.PublishSubscribe;

public sealed record DaprSubscriptionOptions
{
  public IReadOnlyDictionary<string, string> Metadata { get; init; } = Dictionary<string, string>();

  public string? DeadLetterTopic { get; init; }
}

public sealed record TopicRequest
{
    public required string Id { get; init; }
    public required string Source { get; init; }
    public required string Type { get; init; }
    public required string SpecVersion { get; init; }
    public required string DataContentType { get; init; }
    public required string Topic { get; init; }
    public required string PubSubName { get; init; }
    public string? Path { get; init; }
    public object? Extensions { get; init; } // TODO: Determine what this should look like.
}

public enum TopicResponse
{
  Success,
  Error,
  Drop
}

public delegate Task<TopicResponse> TopicHandler(TopicRequest request);

public abstract class DaprPublishSubscribeClient
{
    public DaprPublishSubscribeClient CreateClient();

    public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
}

@WhitWaldo
Copy link
Contributor

Looking at the protos, the extension is implemented as a Struct, so that might be most easily implemented as a Dictionary<string,string>, though I think it'd do developers a courtesy if we could identify what all the properties are and strongly type it as an Options record, then map it out ourselves. I hate seeing a library that provides this open-ended mapping and then leaves it to me to figure out what to put in it.

This might be worth a more substantive discussion elsewhere, but I was tentatively thinking of the following component-based naming scheme:

  • Two-part names for Dapr-provided services, e.g. Dapr.Jobs or Dapr.Workflow where there isn't really an underlying component swapped in that performs the ability.
  • Three-part names for component abstractions: Dapr.State.KeyValue, Dapr.State.Cache, Dapr.State.Relational, or here, Dapr.Messaging.PubSub (or Dapr.Messaging.PublishSubscribe).

This leaves the door open for other messaging building blocks down the road like an event bus or distributed event sender/processor under the same messaging subset.

@philliphoff
Copy link
Contributor

Looking at the protos, the extension is implemented as a Struct, so that might be most easily implemented as a Dictionary<string,string>, though I think it'd do developers a courtesy if we could identify what all the properties are and strongly type it as an Options record, then map it out ourselves. I hate seeing a library that provides this open-ended mapping and then leaves it to me to figure out what to put in it.

@WhitWaldo I also prefer that, for well-known values, the APIs expose them in an appropriately typed way.

Three-part names for component abstractions: Dapr.State.KeyValue, Dapr.State.Cache, Dapr.State.Relational, or here, Dapr.Messaging.PubSub (or Dapr.Messaging.PublishSubscribe).

@WhitWaldo I feel like, from a packaging perspective, this might be too fine-grained. That said, I think it'd be fine to have a single component-category package (e.g. Dapr.State or Dapr.Messaging) that included multiple fine-grained clients (e.g. Dapr.State package contains both Dapr.State.KeyValue.DaprKeyValueClient as well as Dapr.State.Cache.DaprCacheClient).

@philliphoff
Copy link
Contributor

philliphoff commented Jul 10, 2024

@WhitWaldo There's a start of my thinking in this branch:

https://github.com/dapr/dotnet-sdk/tree/philliphoff-streaming-subs

(I had started thinking about this a bit before you self-assigned the issue; feel free to borrow, build-upon, or completely ignore.)

@WhitWaldo
Copy link
Contributor

WhitWaldo commented Jul 11, 2024

@philliphoff I use a mono-repo for most of my own professional stuff and just leave it to the CI/CD to deal with the individual packages, so I'll typically lean towards more specific packages than jamming everything in one package. Pair that with a documentation perspective and I like the idea of saying that here are the docs for this building block, here are the docs to explain this specialty block within with a package purpose-built for this sub-concept. Namespaces are nice within the package itself, but just as I'd prefer to just install Dapr.Messaging.PublishSubscribe instead of Dapr.Client and get all that comes with it, the same holes for wanting to avoid installing just Dapr.Messaging and getting a whole distributed event processing system in it I didn't necessarily need to use, so it's just a consistently narrow focus all the way down. Paired with the lack of optional interfaces, it's a perfect abstraction of all the narrowed capabilities of components it ties into.

I meandered through your approach and think it frankly looks great. I'd build a sample or two off it just to play around with the experience, but generate the client using the shared project from my Jobs proposal (so it's bundling any available api-token) and I think it's frankly pretty much there.

Now, the big question would be whether, especially come 1.15, you'd be open to shuffling out all the pubsub bits to this package or if that's worth some sort of lower-level facelift first with a fresh start in the new package (like what I'm slowly wanting to do with state)?

@philliphoff
Copy link
Contributor

@WhitWaldo For me it's finding a balance between individual purpose and repo/package maintainability. I'm not sure I'm convinced yet that having separate packages for, say, key/value store clients and cache clients are in that sweet spot of balance. (We always reserve the right to further split packages as necessary, too. Yes it's breaking, but it's a fairly minimal one.)

I've updated my branch switching to a IDisposable Subscribe(/* ... */) model which is a little more aligned with some eventing frameworks (like Rx.NET).

I'm running into some curious behavior in testing, though, where the Dapr sidecar seems to ignore success/drop responses and keeps re-delivering messages. From the logs, it appears the responses are getting back to the sidecar so I'm not sure what's happening.

I've also done a little testing with raw payloads; that seems to result in none of the normal topic request fields to be populated and the extensions field to contain the actual data; I'm not sure yet how best to expose that. I'm also not sure how useful it is to manage deserialization of the data on the user's behalf vs. just having the user do it (which is fairly trivial and the content type is provided anyway, as well as gets the Dapr SDK out of the serialization options business).

@WhitWaldo
Copy link
Contributor

@philliphoff I get that - I'll see if I can think of more evidence to support one approach or the other.

That's less than ideal behavior - happy to dig in and see if I can't help out once I've got Jobs wrapped up.

Let me know if you come to a conclusion on the serialization. It's something I'm torn on with the Jobs mapping is whether it should similarly get out of the serialization game. On one hand, it'd be nice to blindly pass types to a generic endpoint and it all Just Work. On the other hand, if it instead accepted only an array of bytes for the job payload and left serialization entirely to the developer, that would simplify the SDK mapping significantly because we wouldn't potentially have a situation of the job being triggered and deserialized into the wrong payload type and throwing - it'd be nice to ensure that any runtime errors are because the developer failed to deserialize and handle their own types instead of it being an inevitable "Dapr" problem.

@m3nax
Copy link
Contributor

m3nax commented Jul 30, 2024

@philliphoff I am reading the code written for streaming support and think that an IAsyncEnumerable version of the Subscribe method might be a good addition. Can be used to create message processing pipeline with some chained linq filters for example.

public abstract class DaprPublishSubscribeClient
{
    public DaprPublishSubscribeClient CreateClient();

    public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);

    public IAsyncEnumerator<request> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
}

What do you think about?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants