From c4bafa963833bdf068432994fcde0c618dc70f2b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 30 Jan 2026 13:35:40 +0100 Subject: [PATCH 1/2] Refactor IStream to single generic parameter interface Refactored IStream and related classes to use a single generic parameter (IStream), removing the internal TCurrent type from the public API. Updated all builder interfaces, handlers, and usages to match the new signature. Introduced IBranchInfo for branch metadata, and updated GetBranches() to return IBranchInfo instances. This simplifies the stream abstraction and improves API encapsulation. --- .../Behaviors/StreamEmittingCommandBehavior.cs | 4 ++-- .../StreamEmittingNotificationBehavior.cs | 4 ++-- .../ServiceCollectionExtensions.cs | 4 ++-- .../Handlers/StreamBackedStreamQueryHandler.cs | 4 ++-- .../StreamEmittingNotificationHandler.cs | 8 ++++---- src/Cortex.Streams/Abstractions/IBranchInfo.cs | 15 +++++++++++++++ .../Abstractions/IFanOutBuilder.cs | 2 +- src/Cortex.Streams/Abstractions/ISinkBuilder.cs | 4 ++-- src/Cortex.Streams/Abstractions/IStream.cs | 6 +++--- .../Abstractions/IStreamBuilder.cs | 2 +- src/Cortex.Streams/FanOutBuilder.cs | 2 +- src/Cortex.Streams/Operators/BranchOperator.cs | 5 +++-- src/Cortex.Streams/SinkBuilder.cs | 2 +- src/Cortex.Streams/Stream.cs | 9 +++++---- src/Cortex.Streams/StreamBuilder.cs | 2 +- .../Tests/StreamEmittingCommandBehaviorTests.cs | 12 ++++++------ .../Tests/StreamEmittingHandlerTests.cs | 16 ++++++++-------- 17 files changed, 59 insertions(+), 42 deletions(-) create mode 100644 src/Cortex.Streams/Abstractions/IBranchInfo.cs diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs index 9b850b0..192bf3f 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingCommandBehavior.cs @@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingCommandBehavior : ICommandPipelineBehavior where TCommand : ICommand { - private readonly IStream, CommandExecutionEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeExecution; private readonly bool _emitAfterExecution; @@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior : ICommandPipeline /// If true, emit an event before command execution. /// If true, emit an event after command execution. public StreamEmittingCommandBehavior( - IStream, CommandExecutionEvent> stream, + IStream> stream, bool emitBeforeExecution = false, bool emitAfterExecution = true) { diff --git a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs index 5c585b5..75cf796 100644 --- a/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs +++ b/src/Cortex.Streams.Mediator/Behaviors/StreamEmittingNotificationBehavior.cs @@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors public class StreamEmittingNotificationBehavior : INotificationPipelineBehavior where TNotification : INotification { - private readonly IStream, NotificationEvent> _stream; + private readonly IStream> _stream; private readonly bool _emitBeforeHandling; private readonly bool _emitAfterHandling; @@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior : INotificationPi /// If true, emit an event before notification handling. /// If true, emit an event after notification handling. public StreamEmittingNotificationBehavior( - IStream, NotificationEvent> stream, + IStream> stream, bool emitBeforeHandling = false, bool emitAfterHandling = true) { diff --git a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs index 6a24dae..243fe66 100644 --- a/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Cortex.Streams.Mediator/DependencyInjection/ServiceCollectionExtensions.cs @@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions /// The service collection for chaining. public static IServiceCollection AddStreamEmittingNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Action errorHandler = null) where TNotification : INotification { @@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandlerThe service collection for chaining. public static IServiceCollection AddTransformingStreamNotificationHandler( this IServiceCollection services, - Func> streamFactory, + Func> streamFactory, Func transformer, Action errorHandler = null) where TNotification : INotification diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs index 3f6a313..52e572e 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamBackedStreamQueryHandler.cs @@ -17,7 +17,7 @@ namespace Cortex.Streams.Mediator.Handlers public abstract class StreamBackedStreamQueryHandler : IStreamQueryHandler where TQuery : IStreamQuery { - private readonly IStream _stream; + private readonly IStream _stream; private readonly int _channelCapacity; /// @@ -25,7 +25,7 @@ public abstract class StreamBackedStreamQueryHandler : IStreamQ /// /// The Cortex Stream to read data from. /// The capacity of the internal channel buffer. Default is 100. - protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) + protected StreamBackedStreamQueryHandler(IStream stream, int channelCapacity = 100) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); _channelCapacity = channelCapacity; diff --git a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs index 6a0c2c9..f6dfe01 100644 --- a/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs +++ b/src/Cortex.Streams.Mediator/Handlers/StreamEmittingNotificationHandler.cs @@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers public class StreamEmittingNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Action _errorHandler; /// @@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler : INotificationHan /// The stream to emit notifications to. /// Optional handler for errors during emission. public StreamEmittingNotificationHandler( - IStream stream, + IStream stream, Action errorHandler = null) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); @@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat public class TransformingStreamNotificationHandler : INotificationHandler where TNotification : INotification { - private readonly IStream _stream; + private readonly IStream _stream; private readonly Func _transformer; private readonly Action _errorHandler; @@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler /// A function to transform notifications into stream input. /// Optional handler for errors during emission. public TransformingStreamNotificationHandler( - IStream stream, + IStream stream, Func transformer, Action errorHandler = null) { diff --git a/src/Cortex.Streams/Abstractions/IBranchInfo.cs b/src/Cortex.Streams/Abstractions/IBranchInfo.cs new file mode 100644 index 0000000..d801ce4 --- /dev/null +++ b/src/Cortex.Streams/Abstractions/IBranchInfo.cs @@ -0,0 +1,15 @@ +namespace Cortex.Streams.Abstractions +{ + /// + /// Provides information about a branch in the stream processing pipeline. + /// This is a non-generic interface that allows accessing branch metadata + /// without exposing internal type parameters. + /// + public interface IBranchInfo + { + /// + /// Gets the name of the branch. + /// + string BranchName { get; } + } +} diff --git a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs index 031b802..0dfd645 100644 --- a/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IFanOutBuilder.cs @@ -126,6 +126,6 @@ public interface IFanOutBuilder /// /// The built stream instance ready to be started. /// Thrown when no sinks have been configured. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs index 54e35dd..3dab53c 100644 --- a/src/Cortex.Streams/Abstractions/ISinkBuilder.cs +++ b/src/Cortex.Streams/Abstractions/ISinkBuilder.cs @@ -4,13 +4,13 @@ /// Provides a method to build the stream after adding a sink. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. + /// The current type of data in the stream (internal use only). public interface ISinkBuilder { /// /// Builds the stream and returns a stream instance that can be started and stopped. /// /// A stream instance. - IStream Build(); + IStream Build(); } } diff --git a/src/Cortex.Streams/Abstractions/IStream.cs b/src/Cortex.Streams/Abstractions/IStream.cs index cb96d83..8e3523c 100644 --- a/src/Cortex.Streams/Abstractions/IStream.cs +++ b/src/Cortex.Streams/Abstractions/IStream.cs @@ -1,5 +1,5 @@ using Cortex.States; -using Cortex.Streams.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.Performance; using System.Collections.Generic; using System.Threading; @@ -7,7 +7,7 @@ namespace Cortex.Streams { - public interface IStream + public interface IStream { /// /// Start the stream processing. @@ -66,7 +66,7 @@ public interface IStream StreamStatuses GetStatus(); - IReadOnlyDictionary> GetBranches(); + IReadOnlyDictionary GetBranches(); TStateStore GetStateStoreByName(string name) where TStateStore : IDataStore; IEnumerable GetStateStoresByType() where TStateStore : IDataStore; diff --git a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs index 0b14ffc..aae8d7a 100644 --- a/src/Cortex.Streams/Abstractions/IStreamBuilder.cs +++ b/src/Cortex.Streams/Abstractions/IStreamBuilder.cs @@ -107,7 +107,7 @@ public interface IStreamBuilder /// Builds the stream /// /// - IStream Build(); + IStream Build(); /// diff --git a/src/Cortex.Streams/FanOutBuilder.cs b/src/Cortex.Streams/FanOutBuilder.cs index 2507f34..620cd07 100644 --- a/src/Cortex.Streams/FanOutBuilder.cs +++ b/src/Cortex.Streams/FanOutBuilder.cs @@ -166,7 +166,7 @@ public IFanOutBuilder ToWithTransform(string name, Func< } /// - public IStream Build() + public IStream Build() { if (_branchOperators.Count == 0) { diff --git a/src/Cortex.Streams/Operators/BranchOperator.cs b/src/Cortex.Streams/Operators/BranchOperator.cs index d10f02a..4395427 100644 --- a/src/Cortex.Streams/Operators/BranchOperator.cs +++ b/src/Cortex.Streams/Operators/BranchOperator.cs @@ -1,4 +1,5 @@ -using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Abstractions; +using Cortex.Streams.ErrorHandling; using Cortex.Telemetry; using System; using System.Collections.Generic; @@ -10,7 +11,7 @@ namespace Cortex.Streams.Operators /// Represents a branch in a fan-out pattern that processes data independently. /// Forwards telemetry and error handling configuration to the branch's operator chain. /// - public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled + public class BranchOperator : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled, IBranchInfo { private readonly string _branchName; private readonly IOperator _branchOperator; diff --git a/src/Cortex.Streams/SinkBuilder.cs b/src/Cortex.Streams/SinkBuilder.cs index 0e4df77..785e17d 100644 --- a/src/Cortex.Streams/SinkBuilder.cs +++ b/src/Cortex.Streams/SinkBuilder.cs @@ -43,7 +43,7 @@ public SinkBuilder( /// Builds the stream and returns a stream instance. /// /// A stream instance. - public IStream Build() + public IStream Build() { return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); } diff --git a/src/Cortex.Streams/Stream.cs b/src/Cortex.Streams/Stream.cs index 8f41cd1..8aebf6f 100644 --- a/src/Cortex.Streams/Stream.cs +++ b/src/Cortex.Streams/Stream.cs @@ -1,5 +1,6 @@ using Cortex.States; using Cortex.States.Operators; +using Cortex.Streams.Abstractions; using Cortex.Streams.ErrorHandling; using Cortex.Streams.Operators; using Cortex.Streams.Performance; @@ -16,8 +17,8 @@ namespace Cortex.Streams /// Represents a built stream that can be started and stopped. /// /// The type of the initial input to the stream. - /// The current type of data in the stream. - public class Stream : IStream, IStatefulOperator, IDisposable + /// The current type of data in the stream (internal use only). + public class Stream : IStream, IStatefulOperator, IDisposable { private readonly string _name; private readonly IOperator _operatorChain; @@ -362,9 +363,9 @@ public BufferStatistics GetBufferStatistics() }; } - public IReadOnlyDictionary> GetBranches() + public IReadOnlyDictionary GetBranches() { - var branchDict = new Dictionary>(); + var branchDict = new Dictionary(); foreach (var branchOperator in _branchOperators) { branchDict[branchOperator.BranchName] = branchOperator; diff --git a/src/Cortex.Streams/StreamBuilder.cs b/src/Cortex.Streams/StreamBuilder.cs index 6d6b1f0..0836773 100644 --- a/src/Cortex.Streams/StreamBuilder.cs +++ b/src/Cortex.Streams/StreamBuilder.cs @@ -215,7 +215,7 @@ IStreamBuilder IInitialStreamBuilder.Stream() } - public IStream Build() + public IStream Build() { //return new Stream(_name, _firstOperator, _branchOperators); return new Stream(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs index ca92b57..eb2588b 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingCommandBehaviorTests.cs @@ -34,7 +34,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -68,7 +68,7 @@ public async Task Handle_EmitsAfterExecutionEvent_WhenConfigured() public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var capturedEvents = new List>(); mockStream @@ -97,7 +97,7 @@ public async Task Handle_EmitsBeforeExecutionEvent_WhenConfigured() public async Task Handle_EmitsFailedEvent_WhenCommandThrows() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -128,7 +128,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); var behavior = new StreamEmittingCommandBehavior( mockStream.Object, @@ -151,7 +151,7 @@ public async Task Handle_DoesNotEmit_WhenBothFlagsAreFalse() public async Task Handle_IncludesDuration_InAfterEvent() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); CommandExecutionEvent? capturedEvent = null; mockStream @@ -184,7 +184,7 @@ public async Task Handle_IncludesDuration_InAfterEvent() public async Task Handle_PropagatesResultCorrectly() { // Arrange - var mockStream = new Mock, CommandExecutionEvent>>(); + var mockStream = new Mock>>(); mockStream .Setup(s => s.EmitAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); diff --git a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs index f04cddd..9f384de 100644 --- a/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs +++ b/src/Cortex.Tests/StreamsMediator/Tests/StreamEmittingHandlerTests.cs @@ -35,7 +35,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public async Task Handle_EmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; mockStream @@ -63,7 +63,7 @@ public async Task Handle_EmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderCreatedNotification? capturedNotification = null; Exception? capturedException = null; @@ -95,7 +95,7 @@ public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() public async Task Handle_ThrowsException_WhenNoErrorHandlerAndStreamFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); mockStream .Setup(s => s.EmitAsync(It.IsAny(), It.IsAny())) @@ -113,7 +113,7 @@ await Assert.ThrowsAsync(() => public async Task Handle_PassesCancellationToken_ToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); CancellationToken capturedToken = default; mockStream @@ -149,7 +149,7 @@ public void Constructor_ThrowsArgumentNullException_WhenStreamIsNull() public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); // Act & Assert Assert.Throws(() => @@ -162,7 +162,7 @@ public void Constructor_ThrowsArgumentNullException_WhenTransformerIsNull() public async Task Handle_TransformsAndEmitsNotificationToStream() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); OrderStreamData? capturedData = null; mockStream @@ -197,7 +197,7 @@ public async Task Handle_TransformsAndEmitsNotificationToStream() public async Task Handle_InvokesErrorHandler_WhenTransformFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; var handler = new TransformingStreamNotificationHandler( @@ -220,7 +220,7 @@ public async Task Handle_InvokesErrorHandler_WhenTransformFails() public async Task Handle_InvokesErrorHandler_WhenStreamEmitFails() { // Arrange - var mockStream = new Mock>(); + var mockStream = new Mock>(); Exception? capturedException = null; mockStream From 40037de1ce48b37b77d0821ee9166ca4396ccf8b Mon Sep 17 00:00:00 2001 From: Enes Hoxha Date: Fri, 6 Feb 2026 20:01:46 +0100 Subject: [PATCH 2/2] v3/bug/209 : Improve error handling and diagnostics in Streams/Kafka - Add robust error handling to Source/SinkOperatorAdapters using ErrorHandlingHelper.TryExecute and propagate StreamExecutionOptions. - Implement IErrorHandlingEnabled in SourceOperatorAdapter and ensure error handling is set on downstream operators. - Add comprehensive integration and diagnostic tests for KafkaSinkOperator, including scenarios for Start(), error strategies, and custom configs. - Introduce KafkaSinkDiagnostic console tool for troubleshooting Kafka sink usage. - Add extensive unit tests for source operator error handling (skip, stop, retry, custom handler, no hangs/loops, FlatMap). - Reference Cortex.Streams.Kafka in test project for integration coverage. --- .../Operators/SinkOperatorAdapter.cs | 34 +- .../Operators/SourceOperatorAdapter.cs | 51 +- src/Cortex.Tests/Cortex.Tests.csproj | 1 + .../KafkaSinkOperatorIntegrationTests.cs | 368 +++++++++++++ .../Tests/SourceOperatorErrorHandlingTests.cs | 504 ++++++++++++++++++ tools/KafkaSinkDiagnostic.cs | 145 +++++ 6 files changed, 1094 insertions(+), 9 deletions(-) create mode 100644 src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs create mode 100644 src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs create mode 100644 tools/KafkaSinkDiagnostic.cs diff --git a/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs b/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs index 3492a53..92adde6 100644 --- a/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs +++ b/src/Cortex.Streams/Operators/SinkOperatorAdapter.cs @@ -14,6 +14,9 @@ public class SinkOperatorAdapter : IOperator, IHasNextOperators, ITeleme { private readonly ISinkOperator _sinkOperator; + // Cached operator name to avoid string allocation on hot path + private static readonly string OperatorName = $"SinkOperatorAdapter<{typeof(TInput).Name}>"; + // Telemetry fields private ITelemetryProvider _telemetryProvider; private ICounter _processedCounter; @@ -22,6 +25,9 @@ public class SinkOperatorAdapter : IOperator, IHasNextOperators, ITeleme private Action _incrementProcessedCounter; private Action _recordProcessingTime; + // Error handling fields + private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default; + public SinkOperatorAdapter(ISinkOperator sinkOperator) { _sinkOperator = sinkOperator ?? throw new ArgumentNullException(nameof(sinkOperator)); @@ -51,9 +57,12 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) /// /// Forwards error handling configuration to the wrapped sink operator if it implements IErrorHandlingEnabled. + /// Also stores the options for use in this adapter's error handling. /// public void SetErrorHandling(StreamExecutionOptions options) { + _executionOptions = options ?? StreamExecutionOptions.Default; + // Forward error handling to the wrapped sink operator if it supports it if (_sinkOperator is IErrorHandlingEnabled errorHandlingEnabled) { @@ -71,8 +80,19 @@ public void Process(object input) { try { - _sinkOperator.Process((TInput)input); - span.SetAttribute("status", "success"); + var executed = ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + input, + item => _sinkOperator.Process(item)); + + span.SetAttribute("status", executed ? "success" : "skipped"); + } + catch (StreamStoppedException ex) + { + span.SetAttribute("status", "stopped"); + span.SetAttribute("exception", ex.ToString()); + throw; } catch (Exception ex) { @@ -83,14 +103,18 @@ public void Process(object input) finally { stopwatch.Stop(); - _recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds); - _incrementProcessedCounter(); + _recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds); + _incrementProcessedCounter?.Invoke(); } } } else { - _sinkOperator.Process((TInput)input); + ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + input, + item => _sinkOperator.Process(item)); } } diff --git a/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs b/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs index e43e33c..3cafe6b 100644 --- a/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs +++ b/src/Cortex.Streams/Operators/SourceOperatorAdapter.cs @@ -6,11 +6,18 @@ namespace Cortex.Streams.Operators { - public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITelemetryEnabled + /// + /// Adapter that wraps an ISourceOperator to work within the operator chain. + /// Handles telemetry, error handling, and lifecycle management for source operators. + /// + public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled { private readonly ISourceOperator _sourceOperator; private IOperator _nextOperator; + // Cached operator name to avoid string allocation on hot path + private static readonly string OperatorName = $"SourceOperatorAdapter<{typeof(TOutput).Name}>"; + // Telemetry fields private ITelemetryProvider _telemetryProvider; private ICounter _emittedCounter; @@ -19,11 +26,28 @@ public class SourceOperatorAdapter : IOperator, IHasNextOperators, ITel private Action _incrementEmittedCounter; private Action _recordEmissionTime; + // Error handling fields + private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default; + public SourceOperatorAdapter(ISourceOperator sourceOperator) { _sourceOperator = sourceOperator; } + /// + /// Sets the error handling options for this operator and propagates to next operators. + /// + public void SetErrorHandling(StreamExecutionOptions options) + { + _executionOptions = options ?? StreamExecutionOptions.Default; + + // Propagate to the next operator if it supports error handling + if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling) + { + nextWithErrorHandling.SetErrorHandling(_executionOptions); + } + } + public void SetTelemetryProvider(ITelemetryProvider telemetryProvider) { _telemetryProvider = telemetryProvider; @@ -67,6 +91,12 @@ public void SetNext(IOperator nextOperator) nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider); } + // Propagate error handling to the next operator + if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling && _executionOptions != null) + { + nextWithErrorHandling.SetErrorHandling(_executionOptions); + } + // Start the source operator Start(); } @@ -84,8 +114,15 @@ private void Start() try { _incrementEmittedCounter?.Invoke(); - _nextOperator?.Process(output); - span.SetAttribute("status", "success"); + + // Use error handling helper to properly handle errors according to stream configuration + var executed = ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + output, + item => _nextOperator?.Process(item)); + + span.SetAttribute("status", executed ? "success" : "skipped"); } catch (StreamStoppedException ex) { @@ -99,6 +136,7 @@ private void Start() { span.SetAttribute("status", "error"); span.SetAttribute("exception", ex.ToString()); + // Re-throw to let the source operator handle it (e.g., logging, stopping) throw; } finally @@ -112,7 +150,12 @@ private void Start() { try { - _nextOperator?.Process(output); + // Use error handling helper to properly handle errors according to stream configuration + ErrorHandlingHelper.TryExecute( + _executionOptions, + OperatorName, + output, + item => _nextOperator?.Process(item)); } catch (StreamStoppedException) { diff --git a/src/Cortex.Tests/Cortex.Tests.csproj b/src/Cortex.Tests/Cortex.Tests.csproj index 067a52f..24f83f5 100644 --- a/src/Cortex.Tests/Cortex.Tests.csproj +++ b/src/Cortex.Tests/Cortex.Tests.csproj @@ -30,6 +30,7 @@ + diff --git a/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs b/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs new file mode 100644 index 0000000..306c6d1 --- /dev/null +++ b/src/Cortex.Tests/Streams.Kafka/KafkaSinkOperatorIntegrationTests.cs @@ -0,0 +1,368 @@ +using Confluent.Kafka; +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Kafka; +using Microsoft.Extensions.Logging; +using Moq; +using System; +using System.Threading; +using Xunit; +using Xunit.Abstractions; + +namespace Cortex.Tests.Streams.Kafka +{ + /// + /// Integration tests for KafkaSinkOperator to diagnose message production issues. + /// + public class KafkaSinkOperatorIntegrationTests + { + private readonly ITestOutputHelper _output; + private readonly string _bootstrapServers = "145.239.0.42:29092"; + private readonly string _topic = "cortex.events_tests"; + + public KafkaSinkOperatorIntegrationTests(ITestOutputHelper output) + { + _output = output; + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_ProducesMessage_WhenStartIsCalled() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act - Start is REQUIRED before processing + sinkOperator.Start(); + + var testMessage = $"Test message at {DateTime.UtcNow:O}"; + _output.WriteLine($"Sending message: {testMessage}"); + + sinkOperator.Process(testMessage); + + // Give Kafka time to produce + Thread.Sleep(2000); + + // Stop to flush pending messages + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Message should be produced to Kafka"); + } + + [Fact] + public void Process_DoesNotProduceMessage_WhenStartIsNotCalled() + { + // Arrange + var mockLogger = new Mock>>(); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: mockLogger.Object); + + // Act - DO NOT call Start() + var testMessage = "This message should not be produced"; + sinkOperator.Process(testMessage); + + // Assert - Should log warning + mockLogger.Verify( + x => x.Log( + LogLevel.Warning, + It.IsAny(), + It.Is((v, t) => v.ToString().Contains("not running")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + sinkOperator.Dispose(); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithErrorHandling_ProducesMessages() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Configure error handling + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromMilliseconds(100), + OnError = ctx => + { + _output.WriteLine($"Error in {ctx.OperatorName}: {ctx.Exception.Message}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + // Act + sinkOperator.Start(); + + for (int i = 0; i < 5; i++) + { + var message = $"Message {i} at {DateTime.UtcNow:O}"; + _output.WriteLine($"Producing: {message}"); + sinkOperator.Process(message); + } + + // Give time for async production + Thread.Sleep(3000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("All 5 messages should be produced to Kafka"); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithNullMessages_SkipsNulls() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act + sinkOperator.Start(); + + sinkOperator.Process("Valid message 1"); + sinkOperator.Process(null); // Should skip + sinkOperator.Process("Valid message 2"); + + Thread.Sleep(2000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Only 2 messages should be produced (nulls are skipped)"); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void Process_WithCustomConfig_ProducesMessages() + { + // Arrange + var config = new ProducerConfig + { + BootstrapServers = _bootstrapServers, + Acks = Acks.All, + EnableIdempotence = true, + MaxInFlight = 5, + LingerMs = 0, // Send immediately for testing + CompressionType = CompressionType.None + }; + + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + config: config, + logger: logger); + + // Act + sinkOperator.Start(); + + var message = $"Custom config test at {DateTime.UtcNow:O}"; + _output.WriteLine($"Producing: {message}"); + sinkOperator.Process(message); + + Thread.Sleep(2000); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + + // Assert + _output.WriteLine("Message should be produced with custom config"); + } + + [Fact] + public void Start_CanBeCalledMultipleTimes_WithoutError() + { + // Arrange + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic); + + // Act - Multiple starts should be safe + sinkOperator.Start(); + sinkOperator.Start(); // Should not throw + + // Assert + sinkOperator.Process("test"); + + sinkOperator.Stop(); + sinkOperator.Dispose(); + } + + [Fact] + public void Dispose_AfterStart_FlushesMessages() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + // Act + sinkOperator.Start(); + sinkOperator.Process("Message before dispose"); + + // Dispose should flush + sinkOperator.Dispose(); + + // Assert - Should not throw + Assert.Throws(() => sinkOperator.Process("After dispose")); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void FullStreamTest_WithKafkaSink() + { + // Arrange + var logger = new TestLogger>(_output); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip, + OnError = ctx => + { + _output.WriteLine($"ERROR: {ctx.Exception.Message}"); + return ErrorHandlingDecision.Skip; + } + }; + + sinkOperator.SetErrorHandling(executionOptions); + + // Act - Simulate stream processing + sinkOperator.Start(); + + _output.WriteLine("=== Starting message production ==="); + + for (int i = 0; i < 10; i++) + { + var message = $"Stream message {i} at {DateTime.UtcNow:O}"; + _output.WriteLine($"Processing: {message}"); + sinkOperator.Process(message); + Thread.Sleep(100); // Small delay between messages + } + + _output.WriteLine("=== Stopping and flushing ==="); + sinkOperator.Stop(); + + _output.WriteLine("=== Disposing ==="); + sinkOperator.Dispose(); + + _output.WriteLine("=== Test complete ==="); + } + + [Fact(Skip = "Integration test - requires live Kafka")] + public void DiagnosticTest_CheckWhatIsHappening() + { + // This test helps diagnose exactly what's happening with the new implementation + var logger = new TestLogger>(_output); + + _output.WriteLine("=== Creating KafkaSinkOperator ==="); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: _bootstrapServers, + topic: _topic, + logger: logger); + + _output.WriteLine("=== Setting Error Handling ==="); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromSeconds(1), + OnError = ctx => + { + _output.WriteLine($"[ERROR HANDLER] Operator: {ctx.OperatorName}"); + _output.WriteLine($"[ERROR HANDLER] Exception: {ctx.Exception?.Message}"); + _output.WriteLine($"[ERROR HANDLER] Attempt: {ctx.Attempt}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + _output.WriteLine("=== Starting Operator (THIS IS CRITICAL!) ==="); + sinkOperator.Start(); + + _output.WriteLine("=== Processing Test Message ==="); + var testMessage = $"Diagnostic test at {DateTime.UtcNow:O}"; + _output.WriteLine($"Message content: {testMessage}"); + + try + { + sinkOperator.Process(testMessage); + _output.WriteLine("Process() completed without exception"); + } + catch (Exception ex) + { + _output.WriteLine($"Process() threw exception: {ex.GetType().Name}: {ex.Message}"); + } + + _output.WriteLine("=== Waiting 5 seconds for async production ==="); + Thread.Sleep(5000); + + _output.WriteLine("=== Stopping (flushes pending messages) ==="); + try + { + sinkOperator.Stop(); + _output.WriteLine("Stop() completed"); + } + catch (Exception ex) + { + _output.WriteLine($"Stop() threw exception: {ex.GetType().Name}: {ex.Message}"); + } + + _output.WriteLine("=== Disposing ==="); + sinkOperator.Dispose(); + + _output.WriteLine("=== Test Complete ==="); + _output.WriteLine(""); + _output.WriteLine("Check Kafka topic 'cortex.events_tests' for the message."); + } + } + + /// + /// Test logger that outputs to xUnit test output. + /// + internal class TestLogger : ILogger + { + private readonly ITestOutputHelper _output; + + public TestLogger(ITestOutputHelper output) + { + _output = output; + } + + public IDisposable BeginScope(TState state) => null; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + var message = formatter(state, exception); + _output.WriteLine($"[{logLevel}] {message}"); + if (exception != null) + { + _output.WriteLine($"Exception: {exception}"); + } + } + } +} diff --git a/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs b/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs new file mode 100644 index 0000000..b6c6a15 --- /dev/null +++ b/src/Cortex.Tests/Streams/Tests/SourceOperatorErrorHandlingTests.cs @@ -0,0 +1,504 @@ +using Cortex.Streams; +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Operators; +using System.Collections.Concurrent; + +namespace Cortex.Streams.Tests +{ + /// + /// Tests for error handling with source operators. + /// These tests verify that: + /// - Source operators properly propagate error handling to downstream operators + /// - Skip strategy allows source operators to continue emitting after downstream errors + /// - Stop strategy properly stops source operators when errors occur + /// - Retry strategy works with source operators + /// - Source operators don't hang or loop when errors occur + /// + public class SourceOperatorErrorHandlingTests + { + #region Mock Source Operator + + /// + /// A test source operator that emits values on demand and tracks its state. + /// + private class TestSourceOperator : ISourceOperator + { + private Action _emit; + private bool _isRunning; + private readonly ConcurrentQueue _pendingItems = new(); + private readonly ManualResetEventSlim _emitEvent = new(false); + private Task _emitTask; + private CancellationTokenSource _cts; + + public bool IsRunning => _isRunning; + public int EmitCount { get; private set; } + public bool WasStoppedExplicitly { get; private set; } + + public void Start(Action emit) + { + _emit = emit ?? throw new ArgumentNullException(nameof(emit)); + _isRunning = true; + _cts = new CancellationTokenSource(); + + // Process pending items in background + _emitTask = Task.Run(() => + { + while (!_cts.Token.IsCancellationRequested) + { + _emitEvent.Wait(_cts.Token); + _emitEvent.Reset(); + + while (_pendingItems.TryDequeue(out var item) && !_cts.Token.IsCancellationRequested) + { + try + { + _emit(item); + EmitCount++; + } + catch (Exception) + { + // Source operators typically catch exceptions from emit + // to prevent the consume loop from crashing + } + } + } + }, _cts.Token); + } + + public void Stop() + { + WasStoppedExplicitly = true; + _isRunning = false; + _cts?.Cancel(); + _emitEvent.Set(); // Wake up the task to exit + } + + /// + /// Enqueue an item to be emitted by the source operator. + /// + public void EnqueueItem(T item) + { + _pendingItems.Enqueue(item); + _emitEvent.Set(); + } + + /// + /// Wait for all pending items to be processed. + /// + public async Task WaitForProcessingAsync(TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (_pendingItems.Count > 0 && DateTime.UtcNow < deadline) + { + await Task.Delay(10); + } + } + } + + /// + /// A simpler synchronous test source operator for immediate testing. + /// + private class SynchronousTestSourceOperator : ISourceOperator + { + private Action _emit; + public bool IsRunning { get; private set; } + public bool WasStoppedExplicitly { get; private set; } + public int EmitCount { get; private set; } + + public void Start(Action emit) + { + _emit = emit; + IsRunning = true; + } + + public void Stop() + { + WasStoppedExplicitly = true; + IsRunning = false; + } + + /// + /// Emit an item synchronously through the source operator. + /// + public void EmitItem(T item) + { + _emit(item); + EmitCount++; + } + } + + #endregion + + #region Skip Strategy Tests + + [Fact] + public void SourceOperator_WithSkipStrategy_ContinuesAfterDownstreamError() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceSkipTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Error on item 2"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // This should be skipped + sourceOperator.EmitItem(3); + sourceOperator.EmitItem(4); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 10, 30, 40 }, processedItems); + Assert.Equal(4, sourceOperator.EmitCount); // All items were emitted + Assert.False(sourceOperator.WasStoppedExplicitly || !sourceOperator.IsRunning == false); // Source was stopped by stream.Stop(), not by error + } + + [Fact] + public void SourceOperator_WithSkipStrategy_SkipsSinkErrors() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceSkipSinkTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => x * 10) + .Sink(x => + { + if (x == 20) throw new InvalidOperationException("Sink error on 20"); + processedItems.Add(x); + }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Sink will error on 20 + sourceOperator.EmitItem(3); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 10, 30 }, processedItems); + } + + #endregion + + #region Stop Strategy Tests + + [Fact] + public void SourceOperator_WithStopStrategy_StopsAfterError() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Stop + }; + + var stream = StreamBuilder + .CreateNewStream("SourceStopTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Error on item 2"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // This should trigger stop + + // Assert + Assert.Equal(new[] { 10 }, processedItems); + Assert.True(sourceOperator.WasStoppedExplicitly); + } + + #endregion + + #region Retry Strategy Tests + + [Fact] + public void SourceOperator_WithRetryStrategy_RetriesAndSucceeds() + { + // Arrange + var attemptCount = 0; + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.Zero + }; + + var stream = StreamBuilder + .CreateNewStream("SourceRetryTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + attemptCount++; + if (attemptCount < 3 && x == 1) + throw new InvalidOperationException("Transient error"); + return x * 10; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + stream.Stop(); + + // Assert + Assert.Equal(3, attemptCount); + Assert.Equal(new[] { 10 }, processedItems); + } + + [Fact] + public void SourceOperator_WithRetryStrategy_StopsWhenMaxRetriesExceeded() + { + // Arrange + var attemptCount = 0; + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 2, + RetryDelay = TimeSpan.Zero + }; + + var stream = StreamBuilder + .CreateNewStream("SourceRetryExceededTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + attemptCount++; + throw new InvalidOperationException("Always fails"); +#pragma warning disable CS0162 + return x; +#pragma warning restore CS0162 + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + // Assert - Source should be stopped after max retries exceeded + Assert.True(sourceOperator.WasStoppedExplicitly); + Assert.Equal(2, attemptCount); + } + + #endregion + + #region Custom Error Handler Tests + + [Fact] + public void SourceOperator_WithCustomErrorHandler_ReceivesCorrectContext() + { + // Arrange + var capturedContexts = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + OnError = ctx => + { + capturedContexts.Add(ctx); + return ErrorHandlingDecision.Skip; + } + }; + + var stream = StreamBuilder + .CreateNewStream("SourceCustomHandlerTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x == 2) throw new InvalidOperationException("Test error"); + return x * 10; + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Should trigger error handler + + stream.Stop(); + + // Assert + Assert.Single(capturedContexts); + Assert.Equal("SourceCustomHandlerTest", capturedContexts[0].StreamName); + Assert.Equal(2, capturedContexts[0].Input); + Assert.IsType(capturedContexts[0].Exception); + Assert.Equal(1, capturedContexts[0].Attempt); + } + + #endregion + + #region No Hanging/Looping Tests + + [Fact] + public async Task SourceOperator_WithErrors_DoesNotHang() + { + // Arrange + var sourceOperator = new TestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceNoHangTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + if (x % 2 == 0) throw new InvalidOperationException("Even number error"); + return x * 10; + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act - Emit many items including errors + for (int i = 1; i <= 100; i++) + { + sourceOperator.EnqueueItem(i); + } + + // Wait for processing with timeout - this should not hang + var completed = await Task.Run(async () => + { + await sourceOperator.WaitForProcessingAsync(TimeSpan.FromSeconds(5)); + return true; + }).WaitAsync(TimeSpan.FromSeconds(10)); + + stream.Stop(); + + // Assert + Assert.True(completed, "Processing should complete without hanging"); + } + + [Fact] + public async Task SourceOperator_WithStopStrategy_DoesNotLoop() + { + // Arrange + var emitCount = 0; + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Stop + }; + + var stream = StreamBuilder + .CreateNewStream("SourceNoLoopTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .Map(x => + { + Interlocked.Increment(ref emitCount); + throw new InvalidOperationException("Always fails"); +#pragma warning disable CS0162 + return x; +#pragma warning restore CS0162 + }) + .Sink(x => { }) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + + // Small delay to ensure no looping + await Task.Delay(100); + + // Assert - Should only process once, not loop + Assert.Equal(1, emitCount); + Assert.True(sourceOperator.WasStoppedExplicitly); + } + + #endregion + + #region Integration with FlatMap Operator + + [Fact] + public void SourceOperator_WithSkipStrategy_HandlesFlatMapErrors() + { + // Arrange + var processedItems = new List(); + var sourceOperator = new SynchronousTestSourceOperator(); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Skip + }; + + var stream = StreamBuilder + .CreateNewStream("SourceFlatMapTest") + .WithErrorHandling(executionOptions) + .Stream(sourceOperator) + .FlatMap(x => + { + if (x == 2) throw new InvalidOperationException("FlatMap error"); + return new[] { x, x * 10 }; + }) + .Sink(x => processedItems.Add(x)) + .Build(); + + stream.Start(); + + // Act + sourceOperator.EmitItem(1); + sourceOperator.EmitItem(2); // Should be skipped + sourceOperator.EmitItem(3); + + stream.Stop(); + + // Assert + Assert.Equal(new[] { 1, 10, 3, 30 }, processedItems); + } + + #endregion + } +} diff --git a/tools/KafkaSinkDiagnostic.cs b/tools/KafkaSinkDiagnostic.cs new file mode 100644 index 0000000..1cd4d08 --- /dev/null +++ b/tools/KafkaSinkDiagnostic.cs @@ -0,0 +1,145 @@ +using Cortex.Streams.ErrorHandling; +using Cortex.Streams.Kafka; +using System; +using System.Threading; + +namespace Cortex.DiagnosticTools +{ + /// + /// Diagnostic tool to demonstrate and fix Kafka message production issues. + /// + class KafkaSinkDiagnostic + { + static void Main(string[] args) + { + Console.WriteLine("===================================="); + Console.WriteLine("Kafka Sink Operator Diagnostic Tool"); + Console.WriteLine("===================================="); + Console.WriteLine(); + + var bootstrapServers = "localhost:29092"; + var topic = "cortex.events_tests"; + + Console.WriteLine($"Bootstrap Servers: {bootstrapServers}"); + Console.WriteLine($"Topic: {topic}"); + Console.WriteLine(); + + // Demonstrate the WRONG way (without calling Start()) + Console.WriteLine("=== Test 1: WITHOUT calling Start() - MESSAGES WILL NOT BE PRODUCED ==="); + TestWithoutStart(bootstrapServers, topic); + Console.WriteLine(); + + Thread.Sleep(1000); + + // Demonstrate the CORRECT way (with calling Start()) + Console.WriteLine("=== Test 2: WITH calling Start() - MESSAGES WILL BE PRODUCED ==="); + TestWithStart(bootstrapServers, topic); + Console.WriteLine(); + + Thread.Sleep(1000); + + // Demonstrate with error handling + Console.WriteLine("=== Test 3: WITH Start() and Error Handling ==="); + TestWithErrorHandling(bootstrapServers, topic); + Console.WriteLine(); + + Console.WriteLine("===================================="); + Console.WriteLine("Diagnostic Complete!"); + Console.WriteLine("===================================="); + Console.WriteLine(); + Console.WriteLine("Check your Kafka topic to see the messages."); + Console.WriteLine("Press any key to exit..."); + Console.ReadKey(); + } + + static void TestWithoutStart(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[PROCESSING MESSAGE - WITHOUT Start()]"); + sinkOperator.Process("This message WILL NOT be produced because Start() was not called!"); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? Message was NOT produced (operator not running)"); + } + + static void TestWithStart(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[CALLING Start() - THIS IS CRITICAL!]"); + sinkOperator.Start(); + + Console.WriteLine("[PROCESSING MESSAGE - WITH Start()]"); + var message = $"SUCCESS: Message sent at {DateTime.UtcNow:O}"; + sinkOperator.Process(message); + Console.WriteLine($" Content: {message}"); + + Console.WriteLine("[WAITING 2 seconds for async production]"); + Thread.Sleep(2000); + + Console.WriteLine("[STOPPING - Flushes pending messages]"); + sinkOperator.Stop(); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? Message WAS produced!"); + } + + static void TestWithErrorHandling(string bootstrapServers, string topic) + { + Console.WriteLine("[CREATING OPERATOR]"); + var sinkOperator = new KafkaSinkOperator( + bootstrapServers: bootstrapServers, + topic: topic); + + Console.WriteLine("[CONFIGURING ERROR HANDLING]"); + var executionOptions = new StreamExecutionOptions + { + ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, + MaxRetries = 3, + RetryDelay = TimeSpan.FromSeconds(1), + OnError = ctx => + { + Console.WriteLine($" [ERROR HANDLER] Operator: {ctx.OperatorName}"); + Console.WriteLine($" [ERROR HANDLER] Exception: {ctx.Exception?.Message}"); + Console.WriteLine($" [ERROR HANDLER] Attempt: {ctx.Attempt}"); + return ErrorHandlingDecision.Retry; + } + }; + sinkOperator.SetErrorHandling(executionOptions); + + Console.WriteLine("[STARTING OPERATOR]"); + sinkOperator.Start(); + + Console.WriteLine("[PRODUCING 5 MESSAGES]"); + for (int i = 1; i <= 5; i++) + { + var message = $"Message #{i} at {DateTime.UtcNow:O}"; + Console.WriteLine($" Producing: {message}"); + sinkOperator.Process(message); + Thread.Sleep(100); + } + + Console.WriteLine("[WAITING 3 seconds for async production]"); + Thread.Sleep(3000); + + Console.WriteLine("[STOPPING]"); + sinkOperator.Stop(); + + Console.WriteLine("[DISPOSING]"); + sinkOperator.Dispose(); + + Console.WriteLine("[RESULT] ? All 5 messages WAS produced with error handling!"); + } + } +}