Skip to content

feat: Streaming query support via IStreamQuery<TResponse> and IAsyncEnumerable #128

@samtrion

Description

@samtrion

User Story

As a developer building query-heavy features over large data sets, I want streaming query support via IAsyncEnumerable<TResponse>, so that I can return results incrementally without buffering the entire result set in memory.


Background

The current IQuery<TResponse> pattern returns a single response object. Scenarios like paginated exports, report generation, or real-time feeds require returning potentially thousands of items. Using IAsyncEnumerable allows callers to process items as they arrive, reducing memory pressure.


Requirements

  • Define IStreamQuery<TResponse> in NetEvolve.Pulse.Extensibility:
    public interface IStreamQuery<TResponse> : IRequest<IAsyncEnumerable<TResponse>> { }
  • Define IStreamQueryHandler<TQuery, TResponse> in NetEvolve.Pulse.Extensibility:
    public interface IStreamQueryHandler<TQuery, TResponse>
        where TQuery : IStreamQuery<TResponse>
    {
        IAsyncEnumerable<TResponse> HandleAsync(TQuery query, CancellationToken ct);
    }
  • Extend IMediator with:
    IAsyncEnumerable<TResponse> StreamAsync<TQuery, TResponse>(TQuery query, CancellationToken ct = default)
        where TQuery : IStreamQuery<TResponse>;
  • Implement StreamAsync in PulseMediator following the same single-handler enforcement as QueryAsync.
  • IStreamQueryHandler supports one handler per query type (same as IQueryHandler).
  • Interceptors (IQueryInterceptor) do not apply to streaming queries in the initial implementation; streaming queries are executed directly.
  • Handler registration follows the existing HandlerRegistrationExtensions patterns.

Acceptance Criteria

  • IStreamQuery<TResponse> and IStreamQueryHandler<TQuery, TResponse> are defined in NetEvolve.Pulse.Extensibility.
  • IMediator.StreamAsync is implemented in PulseMediator.
  • Registering more than one handler for the same IStreamQueryHandler<TQuery, TResponse> throws InvalidOperationException at dispatch time.
  • Dispatching a streaming query with no registered handler throws InvalidOperationException with a descriptive message.
  • Cancellation via CancellationToken stops iteration without exception leaking to the caller.
  • Unit tests cover: normal streaming, cancellation, missing handler, and duplicate handler scenarios.
  • Integration tests verify that a handler returning yield return items is consumed correctly end-to-end.
  • XML documentation is provided for all new public members.

Out of Scope

  • Interceptor pipeline support for streaming queries (planned for a follow-up).
  • Streaming commands or streaming events.
  • Server-Sent Events or SignalR integration.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type:featureIndicates a new feature or enhancement to be added.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions