Conversation
Add global error handling framework to Cortex Streams Introduced a robust global error handling framework, enabling configurable strategies for retries, skipping, and graceful stream shutdowns. Added new classes (`StreamExecutionOptions`, `ErrorHandlingHelper`, etc.) and enums (`ErrorHandlingDecision`, `ErrorHandlingStrategy`) to support this functionality. Enhanced all operators to integrate error handling, including `AggregateOperator`, `FilterOperator`, `MapOperator`, and windowing operators. Introduced the `IErrorHandlingEnabled` interface for propagating error handling configurations. Updated `StreamBuilder` with a `WithErrorHandling` method to configure global error handling. Improved telemetry spans to include error handling attributes for better observability. Refactored code for consistency, improved thread safety, and added support for `.NET 10.0`. Maintained backward compatibility with existing streams and operators.
Refactor FlatMapOperator and improve error handling Refactored `FlatMapOperator` to enhance readability, improve telemetry initialization, and standardize error handling using `ErrorHandlingHelper`. Updated downstream propagation for telemetry and error handling. Reorganized `using` directives and applied consistent formatting across files. Updated test cases to align with new error handling logic, adjusted sleep durations for stability, and commented out flaky assertions in `SessionWindowOperatorTests` and `TumblingWindowOperatorTests`.
V3/feature/154
Remove windowed stream processing (window operators) All code related to windowed stream processing—including tumbling, sliding, and session windows—has been removed. This includes the deletion of TumblingWindowOperator, SlidingWindowOperator, SessionWindowOperator, and their supporting types (WindowKey, WindowState, SessionKey, SessionState). All related methods have been removed from IStreamBuilder and StreamBuilder, and references to windowing types have been eliminated. Other stream builder features remain unaffected.
Introduce TumblingWindow, SlidingWindow, and SessionWindow operators with state persistence, thread safety, and telemetry support. Extend stream builder interfaces for fluent windowing. Add WindowResult type and comprehensive unit tests. Refactor and replace old windowing logic for consistency and extensibility.
Added ITelemetryEnabled to Branch, Fork, and SinkOperatorAdapter for metrics and tracing. Updated StreamBuilder for telemetry propagation and refactored windowing methods. Introduced TelemetryTests with a mock provider to verify metrics, tracing, and thread safety. Cortex.Tests.csproj now references Cortex.Telemetry. All telemetry features remain optional.
V3/feature/105
- Add ErrorHandlingTests.cs with extensive unit tests for all error handling strategies, custom handlers, and edge cases in Cortex.Streams. - Ensure StreamExecutionOptions are correctly propagated through StreamBuilder operator chains for consistent error handling. - Remove unused using directive in FlatMapOperatorTests.cs. - Update README.md to include Cortex.Serialization.Yaml with NuGet badge.
v3/feature/NO_TICKET: Add error handling tests and propagate options
…modes Introduce advanced tumbling, sliding, and session window operators supporting custom triggers (count, time, early, composite) and state modes (accumulating, discarding, retracting). Add a flexible trigger API, window configuration builder, and emission metadata to WindowResult. Includes comprehensive tests and documentation.
v3/feature/67: Add advanced windowing with custom triggers and state
…ries Add overloads and extension methods to IMediator for sending commands and queries with automatic type inference, reducing the need for explicit type parameters. Update documentation to recommend the new API, and add comprehensive tests to ensure correctness and backward compatibility. Implementation uses reflection and caching for efficient dispatch.
v3/feature/162: Simplify Mediator API with type-inferred commands/queries
Introduce INotificationPipelineBehavior<TNotification> for notifications, enabling middleware-style behaviors (e.g., logging, error handling) around notification handlers. Update MediatorOptions and DI registration to support open and closed notification behaviors. Refactor PublishAsync to execute handlers through the pipeline. Add LoggingNotificationBehavior example and comprehensive tests for pipeline execution and integration. Brings notification handling in line with command/query pipelines.
Introduced exception handling pipeline behaviors for commands, queries, and notifications, including ExceptionHandlingCommandBehavior, ExceptionHandlingVoidCommandBehavior, ExceptionHandlingQueryBehavior, and ExceptionHandlingNotificationBehavior. Added IExceptionHandler and IExceptionHandler<TResult> interfaces for custom exception handling and fallback results. Provided default handlers, DI registration extensions, and comprehensive unit/integration tests. Updated README with usage, configuration, and code samples.
…and tests Introduces a caching pipeline behavior for queries, supporting both the [Cacheable] attribute and ICacheableQuery interface for flexible cache control. Adds ICacheKeyGenerator (with default implementation) for cache key generation and ICacheInvalidator for manual cache invalidation. Provides CachingOptions for configuration and DI extensions for easy setup. Updates README with usage docs and adds comprehensive unit/integration tests. Project files updated to include required caching packages.
Introduces IStreamQuery, streaming handlers, and pipeline behaviors for efficient IAsyncEnumerable-based queries. Adds built-in logging behavior, DI registration, and new IMediator streaming APIs. Includes docs and comprehensive tests for streaming, pipeline, and cancellation scenarios.
…iator Introduces IRequestPreProcessor and IRequestPostProcessor interfaces for running logic before and after handlers, supporting both commands/queries with and without responses. Implements corresponding pipeline behaviors for commands, queries, and notifications. Updates DI extensions for automatic processor registration and adds `.AddProcessorBehaviors()` to options. Enhances documentation with usage examples and adds comprehensive unit and integration tests. This is a non-breaking, opt-in addition.
…ments Mediator Enhancments
Added Cortex.Streams.Mediator project enabling seamless CQRS and stream processing integration. - Implemented mediator-based stream operators for commands, queries, notifications, and streaming queries. - Added extension methods for stream builders and DI registration. - Introduced stream-emitting pipeline behaviors and handlers for event sourcing/auditing. - Provided comprehensive unit tests for all new operators and behaviors. - Updated solution, project references, and documentation for new integration.
Integrate Cortex.Streams with Mediator for CQRS pipelines
- Introduce OneOf<T1..T8> and AnyOf<T1..T8> discriminated unions - Add Result<T>, Result<TValue, TError>, and ResultError types - Provide IResult interfaces and static Result factory/utilities - Implement full unit test coverage for all new types - Fix bug in OneOf3/OneOf4 Equals(object) generic arity - Add Cortex.Types project reference to test project
Add OneOf/AnyOf (up to 8) and Result types with tests
Refactor stream operators to cache operator/type names, reducing string allocations and improving error message consistency. Optimize Mediator's PublishAsync and CreateStream by materializing handler/behavior arrays only once and pre-allocating task arrays. Cache span names in BranchOperator for telemetry. Remove redundant variables and streamline code for better performance and clarity.
Optimize operator allocations and Mediator pipeline
Refactored the stream builder API to remove the second type parameter (TCurrent) from IInitialStreamBuilder and StreamBuilder. Stream creation now starts with StreamBuilder<TIn>.CreateNewStream("Name"), returning IInitialStreamBuilder<TIn>. All builder methods now operate on TIn as the initial/current type. Updated all usages, extension methods, tests, and documentation to use the new API. This change makes the API more intuitive, reduces boilerplate, and improves usability while preserving type safety and flexibility.
Simplify StreamBuilder API to use a single type parameter
Introduces StreamPerformanceOptions for configuring buffered async processing, backpressure strategies (Block, DropOldest, DropNewest, ThrowException), batching, and concurrency. Adds new IStream methods: EmitAsync, EmitBatchAsync, EmitAndForget, StopAsync, and GetBufferStatistics. Implements internal BufferedProcessor<T> using System.Threading.Channels. Maintains backward compatibility and integrates with error handling. Includes comprehensive unit tests and project file updates.
…ocessing-enhancements Add async buffering & backpressure to stream processing
Major feature release:
- Support flow style collections ([...], {...}) in parser/emitter
- Add anchors (&), aliases (*), and merge keys (<<) with resolution
- Preserve and emit comments; attach to nodes for round-trip
- Parse and emit custom tags (!tag, !!type)
- Robust quoting/escaping for scalars; support all YAML scalar styles
- Refactor scanner/parser for advanced YAML features
- Expose new settings: PreferFlowStyle, EmitComments, PreserveComments, ResolveAnchors
- Update docs and add comprehensive tests for new features
- Backward compatible: block style YAML and previous settings remain default
Add flow style, anchors, tags, comments, and quoting
Introduces LeftJoin methods to IStreamBuilder and IBranchStreamBuilder, enabling left join operations between streams and state-backed tables. Implements StreamTableLeftJoinOperator with support for telemetry, error handling, and thread safety. Includes comprehensive unit tests covering join behavior, type safety, telemetry, and error handling. This allows users to enrich stream data with optional reference data, even when reference data is missing.
Introduces StreamStreamJoinOperator for windowed, key-based joining of two unbounded streams with configurable join types (inner, left, right, outer), window size, and buffer management. Extends IStreamBuilder and IBranchStreamBuilder with JoinStream methods for pipeline integration. Adds StreamJoinType and StreamJoinConfiguration for join semantics. Includes comprehensive documentation, real-world examples, and extensive unit tests covering all join types, window expiration, buffer management, concurrency, and telemetry. Thorough XML docs and code comments provided.
V3/feature/65
All major source and sink operators now support structured logging via optional ILogger<T> injection. Console.WriteLine statements have been replaced with logger calls (LogError, LogWarning, LogInformation, LogDebug), using message templates and contextual data. Each connector project now references Microsoft.Extensions.Logging.Abstractions for NullLogger support. Logging is now fully compatible with .NET logging infrastructure, with no breaking changes for existing code. Public APIs and documentation have been updated to reflect the new logger parameter.
Added new Cortex.States.DuckDb project implementing a key-value state store for the Cortex Data Framework using DuckDB as the storage engine. Updated Cortex.sln to include the project under the "States" folder. Provided full documentation (README, Cortex.States.DuckDb.md), NuGet metadata, and project assets (icon, license). Implementation supports custom serialization, batch ops, export, checkpointing, and both in-memory and persistent modes. Added builder/factory patterns and extension methods for easy integration.
Add Cortex.States.DuckDb: DuckDB-backed state store
Deleted IUnitOfWork and related interfaces and classes, and removed UnitOfWork registration from DI setup. Transaction management via Unit of Work is no longer supported in the codebase.
Remove Unit of Work infrastructure and registration
Enhance ForkOperator to allow a continuation operator after branching, enabling the main pipeline to continue processing after a fork. Telemetry propagation now includes the continuation operator. Updated GetNextOperators and Process methods to support this flow. Added tests to verify correct processing of branches and downstream operators. Fixes bug where downstream operators after a fork were not invoked.
Support continuation after ForkOperator and fix branch bug
Introduces the FanOut feature to Cortex.Streams, allowing streams to broadcast data to multiple sinks simultaneously. Adds the IFanOutBuilder interface and FanOut method to the stream builder API, supporting per-sink filtering and transformation. Includes a new FanOutBuilder implementation, comprehensive documentation, and extensive unit and integration tests covering real-world scenarios. This enables robust, flexible multi-sink stream processing with a fluent, user-friendly API.
Add FanOut: multi-sink stream support with filters
Introduce Cortex.Mediator.Behaviors.Transactional package with pipeline behaviors for transactional command execution, supporting both result and void commands. Add configuration, DI extensions, and documentation. Include comprehensive unit tests for transactional logic, including new TransactionalVoidCommandBehaviorTests for void commands. Update solution, assets, and documentation accordingly. All changes are additive and non-breaking.
Add transactional behaviors and tests to Mediator
Introduce IErrorHandlingEnabled and propagate error handling config across all major streaming operators (Kafka, RabbitMQ, Pulsar, Azure Service Bus, AWS SQS). Implement IDisposable for proper resource cleanup and add detailed logging for disposal and connection events. Enhance operator reliability with retry logic, connection recovery, and support for advanced features (e.g., session/partition keys, authentication, publisher confirms). Improve documentation and parameter validation. Add Microsoft.Extensions.Logging.Abstractions for consistent logging.
Refactored all sink operators (Kafka, S3, HTTP, Mediator, etc.) to use a centralized, stream-level error handling mechanism via IErrorHandlingEnabled and ErrorHandlingHelper.TryExecute. Removed all per-operator error handling and retry parameters from constructors; error handling is now configured using StreamExecutionOptions at the stream level. Updated operator adapters and fan-out/fork logic to propagate error handling options to all branches. Adjusted tests and extension methods to match the new unified error handling flow. This is a breaking change that simplifies configuration, ensures consistent behavior, and improves observability across all integrations. See ISSUE-unified-error-handling-integrations.md for migration details.
V3/feature/203
Replaces per-operator error handling with a centralized, stream-level approach using StreamExecutionOptions. All sink operators now implement IErrorHandlingEnabled and use the core ErrorHandlingHelper for consistent behavior. Removes legacy error handling parameters from operator constructors. Updates adapters and fan-out operators to forward error handling options. This change ensures unified, flexible, and maintainable error handling across all messaging, storage, database, HTTP, and mediator integrations. Breaking changes require migration to the new pattern.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Release for v3 of Cortex Data Framework