diff --git a/examples/Elastic.Channels.Example/Drain.cs b/examples/Elastic.Channels.Example/Drain.cs index 126125f..dea69b7 100644 --- a/examples/Elastic.Channels.Example/Drain.cs +++ b/examples/Elastic.Channels.Example/Drain.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using System.Threading.Channels; +using Elastic.Channels.Diagnostics; namespace Elastic.Channels.Example; @@ -38,7 +39,7 @@ public static class Drain return (written, read); } - public static async Task<(int, NoopBufferedChannel)> ElasticChannel( + public static async Task<(int, DiagnosticsBufferedChannel)> ElasticChannel( int totalEvents, int maxInFlight, int bufferSize, int concurrency, int expectedSentBuffers) { var written = 0; @@ -48,9 +49,11 @@ public static class Drain WaitHandle = new CountdownEvent(expectedSentBuffers), MaxInFlightMessages = maxInFlight, MaxConsumerBufferSize = bufferSize, - ConcurrentConsumers = concurrency + ConcurrentConsumers = concurrency, + MaxConsumerBufferLifetime = TimeSpan.FromSeconds(20) + }; - var channel = new NoopBufferedChannel(bufferOptions); + var channel = new DiagnosticsBufferedChannel(bufferOptions, observeConcurrency: false); for (var i = 0; i < totalEvents; i++) { diff --git a/examples/Elastic.Channels.Example/Program.cs b/examples/Elastic.Channels.Example/Program.cs index a4af736..4200ae9 100644 --- a/examples/Elastic.Channels.Example/Program.cs +++ b/examples/Elastic.Channels.Example/Program.cs @@ -9,7 +9,7 @@ var totalEvents = args.Length > 0 && int.TryParse(args[0].Replace("_", ""), out var t) ? t : 70_000_000; var concurrency = args.Length > 1 && int.TryParse(args[1], out var c) ? c : 5; -var maxInFlight = Math.Max(1_000_000, (totalEvents / concurrency) / 10); +var maxInFlight = Math.Max(10_000_000, (totalEvents / concurrency) / 10); var bufferSize = Math.Min(10_000, maxInFlight / 10); Console.WriteLine($"Total Events: {totalEvents:N0} events"); @@ -29,6 +29,7 @@ Console.WriteLine("--- Elastic.Channel write/read to completion---"); var expectedSentBuffers = Math.Max(1, totalEvents / bufferSize); +Console.WriteLine($"Max concurrency: {concurrency:N0}"); Console.WriteLine($"Max outbound buffer: {bufferSize:N0}"); Console.WriteLine($"Expected outbound buffers: {expectedSentBuffers:N0}"); sw.Reset(); @@ -37,6 +38,9 @@ sw.Stop(); messagePerSec = totalEvents / sw.Elapsed.TotalSeconds; +Console.WriteLine(); +// channel is a DiagnosticBufferedChannel that pretty prints a lot of useful information +Console.WriteLine(channel); Console.WriteLine(); Console.WriteLine($"Written buffers: {channel.SentBuffersCount:N0}"); diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 6d41bc1..ec48871 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -33,6 +33,7 @@ public abstract class BufferedChannelBase : BufferedChannelBa { protected BufferedChannelBase(ChannelOptionsBase options) : base(options) { } } + public abstract class BufferedChannelBase : ChannelWriter, IBufferedChannel where TChannelOptions : ChannelOptionsBase @@ -47,9 +48,11 @@ protected BufferedChannelBase(TChannelOptions options) Options = options; var maxConsumers = Math.Max(1, BufferOptions.ConcurrentConsumers); _throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers); - InChannel = Channel.CreateBounded(new BoundedChannelOptions(BufferOptions.MaxInFlightMessages) + var maxIn = Math.Max(1, BufferOptions.MaxInFlightMessages); + InChannel = Channel.CreateBounded(new BoundedChannelOptions(maxIn) { SingleReader = false, + SingleWriter = false, // Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727 // AFAICT this is fine since we run in a dedicated long running task. AllowSynchronousContinuations = true, @@ -57,8 +60,10 @@ protected BufferedChannelBase(TChannelOptions options) // DropWrite will make `TryWrite` always return true, which is not what we want. FullMode = BoundedChannelFullMode.Wait }); + // The minimum out buffer the max of (1 or MaxConsumerBufferSize) as long as it does not exceed MaxInFlightMessages + var maxOut = Math.Min(BufferOptions.MaxInFlightMessages, Math.Max(1, BufferOptions.MaxConsumerBufferSize)); OutChannel = Channel.CreateBounded>( - new BoundedChannelOptions(BufferOptions.MaxInFlightMessages / BufferOptions.MaxConsumerBufferSize) + new BoundedChannelOptions(maxOut) { SingleReader = false, SingleWriter = true, @@ -72,7 +77,7 @@ protected BufferedChannelBase(TChannelOptions options) var waitHandle = BufferOptions.WaitHandle; _inThread = Task.Factory.StartNew(async () => - await ConsumeInboundEvents(BufferOptions.MaxConsumerBufferSize, BufferOptions.MaxConsumerBufferLifetime) + await ConsumeInboundEvents(maxOut, BufferOptions.MaxConsumerBufferLifetime) .ConfigureAwait(false) , TaskCreationOptions.LongRunning ); @@ -126,7 +131,7 @@ private async Task ConsumeOutboundEvents(CountdownEvent? countdown) var taskList = new List(maxConsumers); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) - // ReSharper disable once RemoveRedundantBraces + // ReSharper disable once RemoveRedundantBraces { while (OutChannel.Reader.TryRead(out var buffer)) { @@ -189,7 +194,8 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter { while (inboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item)) { - if (inboundBuffer.DurationSinceFirstWrite > maxInterval) break; + if (inboundBuffer.DurationSinceFirstWrite > maxInterval) + break; inboundBuffer.Add(item); } @@ -198,7 +204,8 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter var outboundBuffer = new OutboundBuffer(inboundBuffer); inboundBuffer.Reset(); - if (OutChannel.Writer.TryWrite(outboundBuffer)) + + if (await PublishAsync(outboundBuffer).ConfigureAwait(false)) continue; foreach (var e in inboundBuffer.Buffer) @@ -206,6 +213,26 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter } } + private ValueTask PublishAsync(IOutboundBuffer buffer) + { + async Task AsyncSlowPath(IOutboundBuffer b) + { + var maxRetries = Options.BufferOptions.MaxRetries; + for (var i = 0; i <= maxRetries; i++) + while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false)) + { + Options.OutboundChannelRetryCallback?.Invoke(i); + if (OutChannel.Writer.TryWrite(b)) + return true; + } + return false; + } + + return OutChannel.Writer.TryWrite(buffer) + ? new ValueTask(true) + : new ValueTask(AsyncSlowPath(buffer)); + } + public virtual void Dispose() { try diff --git a/src/Elastic.Channels/ChannelOptionsBase.cs b/src/Elastic.Channels/ChannelOptionsBase.cs index 29f9c5d..0f859ca 100644 --- a/src/Elastic.Channels/ChannelOptionsBase.cs +++ b/src/Elastic.Channels/ChannelOptionsBase.cs @@ -38,6 +38,12 @@ public abstract class ChannelOptionsBase /// A generic hook to be notified of any bulk request being initiated by public Action? ResponseCallback { get; set; } + + /// + /// Called everytime a publish to the outbound channel failed to write and will be retried. + /// Pushes to the outbound channel follow the same exponential backoff as + /// + public Action? OutboundChannelRetryCallback { get; set; } } } diff --git a/src/Elastic.Channels/Diagnostics/ChannelListener.cs b/src/Elastic.Channels/Diagnostics/ChannelListener.cs new file mode 100644 index 0000000..b4dd468 --- /dev/null +++ b/src/Elastic.Channels/Diagnostics/ChannelListener.cs @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Threading; + +namespace Elastic.Channels.Diagnostics; + +public class ChannelListener +{ + private int _bufferFlushCallback; + + public Exception? ObservedException { get; private set; } + + public virtual bool PublishSuccess => ObservedException == null && _bufferFlushCallback > 0 && _maxRetriesExceeded == 0 && _items > 0; + + private int _responses; + private int _rejections; + private int _retries; + private int _items; + private int _maxRetriesExceeded; + private int _outboundWriteRetries; + + // ReSharper disable once MemberCanBeProtected.Global + public ChannelListener Register(ChannelOptionsBase options) + { + options.BufferOptions.BufferFlushCallback = () => Interlocked.Increment(ref _bufferFlushCallback); + options.ResponseCallback = (_, _) => Interlocked.Increment(ref _responses); + options.PublishRejectionCallback = _ => Interlocked.Increment(ref _rejections); + options.RetryCallBack = _ => Interlocked.Increment(ref _retries); + options.BulkAttemptCallback = (retries, count) => + { + if (retries == 0) Interlocked.Add(ref _items, count); + }; + options.MaxRetriesExceededCallback = _ => Interlocked.Increment(ref _maxRetriesExceeded); + options.OutboundChannelRetryCallback = _=> Interlocked.Increment(ref _outboundWriteRetries); + + if (options.ExceptionCallback == null) options.ExceptionCallback = e => ObservedException ??= e; + else options.ExceptionCallback += e => ObservedException ??= e; + return this; + } + + protected virtual string AdditionalData => string.Empty; + + public override string ToString() => $@"{(!PublishSuccess ? "Failed" : "Successful")} publish over channel. +Consumed on outbound: {_items:N0} +Flushes: {_bufferFlushCallback:N0} +Responses: {_responses:N0} +Outbound Buffer TryWrite Retries: {_retries:N0} +Inbound Buffer TryWrite failures: {_rejections:N0} +Send() Retries: {_retries:N0} +Send() Exhausts: {_maxRetriesExceeded:N0}{AdditionalData} +Exception: {ObservedException} +"; +} diff --git a/src/Elastic.Channels/Diagnostics/DiagnosticsChannel.cs b/src/Elastic.Channels/Diagnostics/DiagnosticsChannel.cs new file mode 100644 index 0000000..6d03dd3 --- /dev/null +++ b/src/Elastic.Channels/Diagnostics/DiagnosticsChannel.cs @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using static Elastic.Channels.Diagnostics.DiagnosticsBufferedChannel; + +namespace Elastic.Channels.Diagnostics; + +/// +/// A NOOP implementation of that: +/// -tracks the number of times is invoked under +/// -observes the maximum concurrent calls to under +/// +public class DiagnosticsBufferedChannel : NoopBufferedChannel +{ + public DiagnosticsBufferedChannel(BufferOptions options, bool observeConcurrency = false) + : base(options, observeConcurrency) => + Listener = new ChannelListener().Register(Options); + + public ChannelListener Listener { get; } + + public override string ToString() => $@"{Listener} +Send Invocations: {SentBuffersCount:N0} +Observed Concurrency: {ObservedConcurrency:N0} +"; +} diff --git a/src/Elastic.Channels/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs similarity index 97% rename from src/Elastic.Channels/NoopBufferedChannel.cs rename to src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index 7c1101a..d7142d6 100644 --- a/src/Elastic.Channels/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -7,7 +7,7 @@ using System.Threading; using System.Threading.Tasks; -namespace Elastic.Channels; +namespace Elastic.Channels.Diagnostics; /// /// A NOOP implementation of that: diff --git a/src/Elastic.Channels/InboundBuffer.cs b/src/Elastic.Channels/InboundBuffer.cs index 4a2a27c..f7c607e 100644 --- a/src/Elastic.Channels/InboundBuffer.cs +++ b/src/Elastic.Channels/InboundBuffer.cs @@ -28,7 +28,8 @@ internal class InboundBuffer : IWriteTrackingBuffer, IDisposable public int Count => Buffer.Count; public TimeSpan? DurationSinceFirstWrite => DateTimeOffset.UtcNow - TimeOfFirstWrite; - public bool NoThresholdsHit => Count == 0 || (Count < _maxBufferSize && DurationSinceFirstWrite <= _forceFlushAfter); + public bool NoThresholdsHit => Count == 0 + || (Count < _maxBufferSize && DurationSinceFirstWrite <= _forceFlushAfter); public InboundBuffer(int maxBufferSize, TimeSpan forceFlushAfter) { @@ -71,7 +72,7 @@ private TimeSpan Wait /// /// Call with a timeout to force a flush to happen every /// . This tries to avoid allocation too many 's - /// needlessly and reuses them if possible. If we know the buffer is empty we can wait indefinitely as well. + /// needlessly and reuses them if possible. /// public async Task WaitToReadAsync(ChannelReader reader) { @@ -83,11 +84,10 @@ public async Task WaitToReadAsync(ChannelReader reader) try { - //if we have nothing in the buffer wait indefinitely for messages - var w = Count == 0 ? TimeSpan.FromMilliseconds(-1) : Wait; + var w = Count == 0 ? _forceFlushAfter : Wait; _breaker.CancelAfter(w); var _ = await reader.WaitToReadAsync(_breaker.Token).ConfigureAwait(false); - _breaker.CancelAfter(_forceFlushAfter); + _breaker.CancelAfter(-1); return true; } catch (Exception) when (_breaker.IsCancellationRequested) @@ -98,7 +98,7 @@ public async Task WaitToReadAsync(ChannelReader reader) } catch (Exception) { - _breaker.CancelAfter(_forceFlushAfter); + _breaker.CancelAfter(-1); return true; } } diff --git a/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs b/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs new file mode 100644 index 0000000..541e517 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Linq; +using System.Threading; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.Serialization; + +namespace Elastic.Ingest.Elasticsearch.Diagnostics; + +// ReSharper disable once UnusedType.Global +public class ElasticsearchChannelListener : ChannelListener +{ + private int _rejectedItems; + private string? _firstItemError; + private int _serverRejections; + + public override bool PublishSuccess => base.PublishSuccess && string.IsNullOrEmpty(_firstItemError); + + // ReSharper disable once UnusedMember.Global + public ElasticsearchChannelListener Register(ResponseItemsChannelOptionsBase options) + { + base.Register(options); + + options.ServerRejectionCallback = r => + { + Interlocked.Add(ref _rejectedItems, r.Count); + if (r.Count > 0) + { + var error = r.Select(e => e.Item2).FirstOrDefault(i=>i.Error != null); + if (error != null) + _firstItemError ??= error.Error?.ToString(); + } + Interlocked.Increment(ref _serverRejections); + }; + return this; + } + + protected override string AdditionalData => $@"Server Rejected Calls: {_serverRejections:N0} +Server Rejected Items: {_rejectedItems:N0} +First Error: {_firstItemError} +"; +} diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 54f91bd..a9e75a8 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -5,6 +5,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; using FluentAssertions; using Xunit; using Xunit.Abstractions;