Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Cortex.Streams.Mediator.Behaviors
public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipelineBehavior<TCommand, TResult>
where TCommand : ICommand<TResult>
{
private readonly IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> _stream;
private readonly IStream<CommandExecutionEvent<TCommand, TResult>> _stream;
private readonly bool _emitBeforeExecution;
private readonly bool _emitAfterExecution;

Expand All @@ -25,7 +25,7 @@ public class StreamEmittingCommandBehavior<TCommand, TResult> : ICommandPipeline
/// <param name="emitBeforeExecution">If true, emit an event before command execution.</param>
/// <param name="emitAfterExecution">If true, emit an event after command execution.</param>
public StreamEmittingCommandBehavior(
IStream<CommandExecutionEvent<TCommand, TResult>, CommandExecutionEvent<TCommand, TResult>> stream,
IStream<CommandExecutionEvent<TCommand, TResult>> stream,
bool emitBeforeExecution = false,
bool emitAfterExecution = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Cortex.Streams.Mediator.Behaviors
public class StreamEmittingNotificationBehavior<TNotification> : INotificationPipelineBehavior<TNotification>
where TNotification : INotification
{
private readonly IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> _stream;
private readonly IStream<NotificationEvent<TNotification>> _stream;
private readonly bool _emitBeforeHandling;
private readonly bool _emitAfterHandling;

Expand All @@ -29,7 +29,7 @@ public class StreamEmittingNotificationBehavior<TNotification> : INotificationPi
/// <param name="emitBeforeHandling">If true, emit an event before notification handling.</param>
/// <param name="emitAfterHandling">If true, emit an event after notification handling.</param>
public StreamEmittingNotificationBehavior(
IStream<NotificationEvent<TNotification>, NotificationEvent<TNotification>> stream,
IStream<NotificationEvent<TNotification>> stream,
bool emitBeforeHandling = false,
bool emitAfterHandling = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class ServiceCollectionExtensions
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddStreamEmittingNotificationHandler<TNotification>(
this IServiceCollection services,
Func<IServiceProvider, IStream<TNotification, TNotification>> streamFactory,
Func<IServiceProvider, IStream<TNotification>> streamFactory,
Action<TNotification, Exception> errorHandler = null)
where TNotification : INotification
{
Expand All @@ -47,7 +47,7 @@ public static IServiceCollection AddStreamEmittingNotificationHandler<TNotificat
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddTransformingStreamNotificationHandler<TNotification, TStreamInput>(
this IServiceCollection services,
Func<IServiceProvider, IStream<TStreamInput, TStreamInput>> streamFactory,
Func<IServiceProvider, IStream<TStreamInput>> streamFactory,
Func<TNotification, TStreamInput> transformer,
Action<TNotification, Exception> errorHandler = null)
where TNotification : INotification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ namespace Cortex.Streams.Mediator.Handlers
public abstract class StreamBackedStreamQueryHandler<TQuery, TResult> : IStreamQueryHandler<TQuery, TResult>
where TQuery : IStreamQuery<TResult>
{
private readonly IStream<TResult, TResult> _stream;
private readonly IStream<TResult> _stream;
private readonly int _channelCapacity;

/// <summary>
/// Initializes a new instance of the <see cref="StreamBackedStreamQueryHandler{TQuery, TResult}"/> class.
/// </summary>
/// <param name="stream">The Cortex Stream to read data from.</param>
/// <param name="channelCapacity">The capacity of the internal channel buffer. Default is 100.</param>
protected StreamBackedStreamQueryHandler(IStream<TResult, TResult> stream, int channelCapacity = 100)
protected StreamBackedStreamQueryHandler(IStream<TResult> stream, int channelCapacity = 100)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_channelCapacity = channelCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Cortex.Streams.Mediator.Handlers
public class StreamEmittingNotificationHandler<TNotification> : INotificationHandler<TNotification>
where TNotification : INotification
{
private readonly IStream<TNotification, TNotification> _stream;
private readonly IStream<TNotification> _stream;
private readonly Action<TNotification, Exception> _errorHandler;

/// <summary>
Expand All @@ -22,7 +22,7 @@ public class StreamEmittingNotificationHandler<TNotification> : INotificationHan
/// <param name="stream">The stream to emit notifications to.</param>
/// <param name="errorHandler">Optional handler for errors during emission.</param>
public StreamEmittingNotificationHandler(
IStream<TNotification, TNotification> stream,
IStream<TNotification> stream,
Action<TNotification, Exception> errorHandler = null)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
Expand Down Expand Up @@ -62,7 +62,7 @@ public async Task Handle(TNotification notification, CancellationToken cancellat
public class TransformingStreamNotificationHandler<TNotification, TStreamInput> : INotificationHandler<TNotification>
where TNotification : INotification
{
private readonly IStream<TStreamInput, TStreamInput> _stream;
private readonly IStream<TStreamInput> _stream;
private readonly Func<TNotification, TStreamInput> _transformer;
private readonly Action<TNotification, Exception> _errorHandler;

Expand All @@ -73,7 +73,7 @@ public class TransformingStreamNotificationHandler<TNotification, TStreamInput>
/// <param name="transformer">A function to transform notifications into stream input.</param>
/// <param name="errorHandler">Optional handler for errors during emission.</param>
public TransformingStreamNotificationHandler(
IStream<TStreamInput, TStreamInput> stream,
IStream<TStreamInput> stream,
Func<TNotification, TStreamInput> transformer,
Action<TNotification, Exception> errorHandler = null)
{
Expand Down
15 changes: 15 additions & 0 deletions src/Cortex.Streams/Abstractions/IBranchInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Cortex.Streams.Abstractions
{
/// <summary>
/// Provides information about a branch in the stream processing pipeline.
/// This is a non-generic interface that allows accessing branch metadata
/// without exposing internal type parameters.
/// </summary>
public interface IBranchInfo
{
/// <summary>
/// Gets the name of the branch.
/// </summary>
string BranchName { get; }
}
}
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IFanOutBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ public interface IFanOutBuilder<TIn, TCurrent>
/// </summary>
/// <returns>The built stream instance ready to be started.</returns>
/// <exception cref="InvalidOperationException">Thrown when no sinks have been configured.</exception>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();
}
}
4 changes: 2 additions & 2 deletions src/Cortex.Streams/Abstractions/ISinkBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
/// Provides a method to build the stream after adding a sink.
/// </summary>
/// <typeparam name="TIn">The type of the initial input to the stream.</typeparam>
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
/// <typeparam name="TCurrent">The current type of data in the stream (internal use only).</typeparam>
public interface ISinkBuilder<TIn, TCurrent>
{
/// <summary>
/// Builds the stream and returns a stream instance that can be started and stopped.
/// </summary>
/// <returns>A stream instance.</returns>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();
}
}
6 changes: 3 additions & 3 deletions src/Cortex.Streams/Abstractions/IStream.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using Cortex.States;
using Cortex.Streams.Operators;
using Cortex.Streams.Abstractions;
using Cortex.Streams.Performance;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Streams
{
public interface IStream<TIn, TCurrent>
public interface IStream<TIn>
{
/// <summary>
/// Start the stream processing.
Expand Down Expand Up @@ -66,7 +66,7 @@ public interface IStream<TIn, TCurrent>

StreamStatuses GetStatus();

IReadOnlyDictionary<string, BranchOperator<TCurrent>> GetBranches();
IReadOnlyDictionary<string, IBranchInfo> GetBranches();

TStateStore GetStateStoreByName<TStateStore>(string name) where TStateStore : IDataStore;
IEnumerable<TStateStore> GetStateStoresByType<TStateStore>() where TStateStore : IDataStore;
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/Abstractions/IStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public interface IStreamBuilder<TIn, TCurrent>
/// Builds the stream
/// </summary>
/// <returns></returns>
IStream<TIn, TCurrent> Build();
IStream<TIn> Build();


/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/FanOutBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public IFanOutBuilder<TIn, TCurrent> ToWithTransform<TOutput>(string name, Func<
}

/// <inheritdoc />
public IStream<TIn, TCurrent> Build()
public IStream<TIn> Build()
{
if (_branchOperators.Count == 0)
{
Expand Down
5 changes: 3 additions & 2 deletions src/Cortex.Streams/Operators/BranchOperator.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cortex.Streams.ErrorHandling;
using Cortex.Streams.Abstractions;
using Cortex.Streams.ErrorHandling;
using Cortex.Telemetry;
using System;
using System.Collections.Generic;
Expand All @@ -10,7 +11,7 @@ namespace Cortex.Streams.Operators
/// Represents a branch in a fan-out pattern that processes data independently.
/// Forwards telemetry and error handling configuration to the branch's operator chain.
/// </summary>
public class BranchOperator<T> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled
public class BranchOperator<T> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled, IBranchInfo
{
private readonly string _branchName;
private readonly IOperator _branchOperator;
Expand Down
34 changes: 29 additions & 5 deletions src/Cortex.Streams/Operators/SinkOperatorAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
{
private readonly ISinkOperator<TInput> _sinkOperator;

// Cached operator name to avoid string allocation on hot path
private static readonly string OperatorName = $"SinkOperatorAdapter<{typeof(TInput).Name}>";

// Telemetry fields
private ITelemetryProvider _telemetryProvider;
private ICounter _processedCounter;
Expand All @@ -22,6 +25,9 @@ public class SinkOperatorAdapter<TInput> : IOperator, IHasNextOperators, ITeleme
private Action _incrementProcessedCounter;
private Action<double> _recordProcessingTime;

// Error handling fields
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;

public SinkOperatorAdapter(ISinkOperator<TInput> sinkOperator)
{
_sinkOperator = sinkOperator ?? throw new ArgumentNullException(nameof(sinkOperator));
Expand Down Expand Up @@ -51,9 +57,12 @@ public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)

/// <summary>
/// Forwards error handling configuration to the wrapped sink operator if it implements IErrorHandlingEnabled.
/// Also stores the options for use in this adapter's error handling.
/// </summary>
public void SetErrorHandling(StreamExecutionOptions options)
{
_executionOptions = options ?? StreamExecutionOptions.Default;

// Forward error handling to the wrapped sink operator if it supports it
if (_sinkOperator is IErrorHandlingEnabled errorHandlingEnabled)
{
Expand All @@ -71,8 +80,19 @@ public void Process(object input)
{
try
{
_sinkOperator.Process((TInput)input);
span.SetAttribute("status", "success");
var executed = ErrorHandlingHelper.TryExecute<TInput>(
_executionOptions,
OperatorName,
input,
item => _sinkOperator.Process(item));

span.SetAttribute("status", executed ? "success" : "skipped");
}
catch (StreamStoppedException ex)
{
span.SetAttribute("status", "stopped");
span.SetAttribute("exception", ex.ToString());
throw;
}
catch (Exception ex)
{
Expand All @@ -83,14 +103,18 @@ public void Process(object input)
finally
{
stopwatch.Stop();
_recordProcessingTime(stopwatch.Elapsed.TotalMilliseconds);
_incrementProcessedCounter();
_recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds);
_incrementProcessedCounter?.Invoke();
}
}
}
else
{
_sinkOperator.Process((TInput)input);
ErrorHandlingHelper.TryExecute<TInput>(
_executionOptions,
OperatorName,
input,
item => _sinkOperator.Process(item));
}
}

Expand Down
51 changes: 47 additions & 4 deletions src/Cortex.Streams/Operators/SourceOperatorAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@

namespace Cortex.Streams.Operators
{
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled
/// <summary>
/// Adapter that wraps an ISourceOperator to work within the operator chain.
/// Handles telemetry, error handling, and lifecycle management for source operators.
/// </summary>
public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled, IErrorHandlingEnabled
{
private readonly ISourceOperator<TOutput> _sourceOperator;
private IOperator _nextOperator;

// Cached operator name to avoid string allocation on hot path
private static readonly string OperatorName = $"SourceOperatorAdapter<{typeof(TOutput).Name}>";

// Telemetry fields
private ITelemetryProvider _telemetryProvider;
private ICounter _emittedCounter;
Expand All @@ -19,11 +26,28 @@ public class SourceOperatorAdapter<TOutput> : IOperator, IHasNextOperators, ITel
private Action _incrementEmittedCounter;
private Action<double> _recordEmissionTime;

// Error handling fields
private StreamExecutionOptions _executionOptions = StreamExecutionOptions.Default;

public SourceOperatorAdapter(ISourceOperator<TOutput> sourceOperator)
{
_sourceOperator = sourceOperator;
}

/// <summary>
/// Sets the error handling options for this operator and propagates to next operators.
/// </summary>
public void SetErrorHandling(StreamExecutionOptions options)
{
_executionOptions = options ?? StreamExecutionOptions.Default;

// Propagate to the next operator if it supports error handling
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling)
{
nextWithErrorHandling.SetErrorHandling(_executionOptions);
}
}

public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
{
_telemetryProvider = telemetryProvider;
Expand Down Expand Up @@ -67,6 +91,12 @@ public void SetNext(IOperator nextOperator)
nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider);
}

// Propagate error handling to the next operator
if (_nextOperator is IErrorHandlingEnabled nextWithErrorHandling && _executionOptions != null)
{
nextWithErrorHandling.SetErrorHandling(_executionOptions);
}

// Start the source operator
Start();
}
Expand All @@ -84,8 +114,15 @@ private void Start()
try
{
_incrementEmittedCounter?.Invoke();
_nextOperator?.Process(output);
span.SetAttribute("status", "success");

// Use error handling helper to properly handle errors according to stream configuration
var executed = ErrorHandlingHelper.TryExecute<TOutput>(
_executionOptions,
OperatorName,
output,
item => _nextOperator?.Process(item));

span.SetAttribute("status", executed ? "success" : "skipped");
}
catch (StreamStoppedException ex)
{
Expand All @@ -99,6 +136,7 @@ private void Start()
{
span.SetAttribute("status", "error");
span.SetAttribute("exception", ex.ToString());
// Re-throw to let the source operator handle it (e.g., logging, stopping)
throw;
}
finally
Expand All @@ -112,7 +150,12 @@ private void Start()
{
try
{
_nextOperator?.Process(output);
// Use error handling helper to properly handle errors according to stream configuration
ErrorHandlingHelper.TryExecute<TOutput>(
_executionOptions,
OperatorName,
output,
item => _nextOperator?.Process(item));
}
catch (StreamStoppedException)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Cortex.Streams/SinkBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public SinkBuilder(
/// Builds the stream and returns a stream instance.
/// </summary>
/// <returns>A stream instance.</returns>
public IStream<TIn, TCurrent> Build()
public IStream<TIn> Build()
{
return new Stream<TIn, TCurrent>(_name, _firstOperator, _branchOperators, _telemetryProvider, _executionOptions, _performanceOptions);
}
Expand Down
Loading