Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions examples/Elastic.Channels.Example/Drain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information

using System.Threading.Channels;
using Elastic.Channels.Diagnostics;

namespace Elastic.Channels.Example;

Expand Down Expand Up @@ -38,7 +39,7 @@ public static class Drain
return (written, read);
}

public static async Task<(int, NoopBufferedChannel)> ElasticChannel(
public static async Task<(int, DiagnosticsBufferedChannel)> ElasticChannel(
int totalEvents, int maxInFlight, int bufferSize, int concurrency, int expectedSentBuffers)
{
var written = 0;
Expand All @@ -48,9 +49,11 @@ public static class Drain
WaitHandle = new CountdownEvent(expectedSentBuffers),
MaxInFlightMessages = maxInFlight,
MaxConsumerBufferSize = bufferSize,
ConcurrentConsumers = concurrency
ConcurrentConsumers = concurrency,
MaxConsumerBufferLifetime = TimeSpan.FromSeconds(20)

};
var channel = new NoopBufferedChannel(bufferOptions);
var channel = new DiagnosticsBufferedChannel(bufferOptions, observeConcurrency: false);

for (var i = 0; i < totalEvents; i++)
{
Expand Down
6 changes: 5 additions & 1 deletion examples/Elastic.Channels.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

var totalEvents = args.Length > 0 && int.TryParse(args[0].Replace("_", ""), out var t) ? t : 70_000_000;
var concurrency = args.Length > 1 && int.TryParse(args[1], out var c) ? c : 5;
var maxInFlight = Math.Max(1_000_000, (totalEvents / concurrency) / 10);
var maxInFlight = Math.Max(10_000_000, (totalEvents / concurrency) / 10);
var bufferSize = Math.Min(10_000, maxInFlight / 10);

Console.WriteLine($"Total Events: {totalEvents:N0} events");
Expand All @@ -29,6 +29,7 @@

Console.WriteLine("--- Elastic.Channel write/read to completion---");
var expectedSentBuffers = Math.Max(1, totalEvents / bufferSize);
Console.WriteLine($"Max concurrency: {concurrency:N0}");
Console.WriteLine($"Max outbound buffer: {bufferSize:N0}");
Console.WriteLine($"Expected outbound buffers: {expectedSentBuffers:N0}");
sw.Reset();
Expand All @@ -37,6 +38,9 @@
sw.Stop();
messagePerSec = totalEvents / sw.Elapsed.TotalSeconds;

Console.WriteLine();
// channel is a DiagnosticBufferedChannel that pretty prints a lot of useful information
Console.WriteLine(channel);
Console.WriteLine();

Console.WriteLine($"Written buffers: {channel.SentBuffersCount:N0}");
Expand Down
39 changes: 33 additions & 6 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class BufferedChannelBase<TEvent, TResponse> : BufferedChannelBa
{
protected BufferedChannelBase(ChannelOptionsBase<TEvent, TResponse> options) : base(options) { }
}

public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
: ChannelWriter<TEvent>, IBufferedChannel<TEvent>
where TChannelOptions : ChannelOptionsBase<TEvent, TResponse>
Expand All @@ -47,18 +48,22 @@ protected BufferedChannelBase(TChannelOptions options)
Options = options;
var maxConsumers = Math.Max(1, BufferOptions.ConcurrentConsumers);
_throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers);
InChannel = Channel.CreateBounded<TEvent>(new BoundedChannelOptions(BufferOptions.MaxInFlightMessages)
var maxIn = Math.Max(1, BufferOptions.MaxInFlightMessages);
InChannel = Channel.CreateBounded<TEvent>(new BoundedChannelOptions(maxIn)
{
SingleReader = false,
SingleWriter = false,
// Stephen Toub comment: https://github.com/dotnet/runtime/issues/26338#issuecomment-393720727
// AFAICT this is fine since we run in a dedicated long running task.
AllowSynchronousContinuations = true,
// wait does not block it simply signals that Writer.TryWrite should return false and be retried
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
});
// The minimum out buffer the max of (1 or MaxConsumerBufferSize) as long as it does not exceed MaxInFlightMessages
var maxOut = Math.Min(BufferOptions.MaxInFlightMessages, Math.Max(1, BufferOptions.MaxConsumerBufferSize));
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
new BoundedChannelOptions(BufferOptions.MaxInFlightMessages / BufferOptions.MaxConsumerBufferSize)
new BoundedChannelOptions(maxOut)
{
SingleReader = false,
SingleWriter = true,
Expand All @@ -72,7 +77,7 @@ protected BufferedChannelBase(TChannelOptions options)

var waitHandle = BufferOptions.WaitHandle;
_inThread = Task.Factory.StartNew(async () =>
await ConsumeInboundEvents(BufferOptions.MaxConsumerBufferSize, BufferOptions.MaxConsumerBufferLifetime)
await ConsumeInboundEvents(maxOut, BufferOptions.MaxConsumerBufferLifetime)
.ConfigureAwait(false)
, TaskCreationOptions.LongRunning
);
Expand Down Expand Up @@ -126,7 +131,7 @@ private async Task ConsumeOutboundEvents(CountdownEvent? countdown)
var taskList = new List<Task>(maxConsumers);

while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
// ReSharper disable once RemoveRedundantBraces
// ReSharper disable once RemoveRedundantBraces
{
while (OutChannel.Reader.TryRead(out var buffer))
{
Expand Down Expand Up @@ -189,7 +194,8 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter
{
while (inboundBuffer.Count < maxQueuedMessages && InChannel.Reader.TryRead(out var item))
{
if (inboundBuffer.DurationSinceFirstWrite > maxInterval) break;
if (inboundBuffer.DurationSinceFirstWrite > maxInterval)
break;

inboundBuffer.Add(item);
}
Expand All @@ -198,14 +204,35 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter

var outboundBuffer = new OutboundBuffer<TEvent>(inboundBuffer);
inboundBuffer.Reset();
if (OutChannel.Writer.TryWrite(outboundBuffer))

if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
continue;

foreach (var e in inboundBuffer.Buffer)
Options.PublishRejectionCallback?.Invoke(e);
}
}

private ValueTask<bool> PublishAsync(IOutboundBuffer<TEvent> buffer)
{
async Task<bool> AsyncSlowPath(IOutboundBuffer<TEvent> b)
{
var maxRetries = Options.BufferOptions.MaxRetries;
for (var i = 0; i <= maxRetries; i++)
while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false))
{
Options.OutboundChannelRetryCallback?.Invoke(i);
if (OutChannel.Writer.TryWrite(b))
return true;
}
return false;
}

return OutChannel.Writer.TryWrite(buffer)
? new ValueTask<bool>(true)
: new ValueTask<bool>(AsyncSlowPath(buffer));
}

public virtual void Dispose()
{
try
Expand Down
6 changes: 6 additions & 0 deletions src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public abstract class ChannelOptionsBase<TEvent, TResponse>

/// <summary> A generic hook to be notified of any bulk request being initiated by <see cref="InboundBuffer{TEvent}"/> </summary>
public Action<TResponse, IWriteTrackingBuffer>? ResponseCallback { get; set; }

/// <summary>
/// Called everytime a publish to the outbound channel failed to write and will be retried.
/// Pushes to the outbound channel follow the same exponential backoff as <see cref="Elastic.Channels.BufferOptions.BackoffPeriod"/>
/// </summary>
public Action<int>? OutboundChannelRetryCallback { get; set; }
}

}
56 changes: 56 additions & 0 deletions src/Elastic.Channels/Diagnostics/ChannelListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Threading;

namespace Elastic.Channels.Diagnostics;

public class ChannelListener<TEvent, TResponse>
{
private int _bufferFlushCallback;

public Exception? ObservedException { get; private set; }

public virtual bool PublishSuccess => ObservedException == null && _bufferFlushCallback > 0 && _maxRetriesExceeded == 0 && _items > 0;

private int _responses;
private int _rejections;
private int _retries;
private int _items;
private int _maxRetriesExceeded;
private int _outboundWriteRetries;

// ReSharper disable once MemberCanBeProtected.Global
public ChannelListener<TEvent, TResponse> Register(ChannelOptionsBase<TEvent, TResponse> options)
{
options.BufferOptions.BufferFlushCallback = () => Interlocked.Increment(ref _bufferFlushCallback);
options.ResponseCallback = (_, _) => Interlocked.Increment(ref _responses);
options.PublishRejectionCallback = _ => Interlocked.Increment(ref _rejections);
options.RetryCallBack = _ => Interlocked.Increment(ref _retries);
options.BulkAttemptCallback = (retries, count) =>
{
if (retries == 0) Interlocked.Add(ref _items, count);
};
options.MaxRetriesExceededCallback = _ => Interlocked.Increment(ref _maxRetriesExceeded);
options.OutboundChannelRetryCallback = _=> Interlocked.Increment(ref _outboundWriteRetries);

if (options.ExceptionCallback == null) options.ExceptionCallback = e => ObservedException ??= e;
else options.ExceptionCallback += e => ObservedException ??= e;
return this;
}

protected virtual string AdditionalData => string.Empty;

public override string ToString() => $@"{(!PublishSuccess ? "Failed" : "Successful")} publish over channel.
Consumed on outbound: {_items:N0}
Flushes: {_bufferFlushCallback:N0}
Responses: {_responses:N0}
Outbound Buffer TryWrite Retries: {_retries:N0}
Inbound Buffer TryWrite failures: {_rejections:N0}
Send() Retries: {_retries:N0}
Send() Exhausts: {_maxRetriesExceeded:N0}{AdditionalData}
Exception: {ObservedException}
";
}
30 changes: 30 additions & 0 deletions src/Elastic.Channels/Diagnostics/DiagnosticsChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Elastic.Channels.Diagnostics.DiagnosticsBufferedChannel;

namespace Elastic.Channels.Diagnostics;

/// <summary>
/// A NOOP implementation of <see cref="BufferedChannelBase{TEvent,TResponse}"/> that:
/// <para> -tracks the number of times <see cref="Send"/> is invoked under <see cref="SentBuffersCount"/> </para>
/// <para> -observes the maximum concurrent calls to <see cref="Send"/> under <see cref="ObservedConcurrency"/> </para>
/// </summary>
public class DiagnosticsBufferedChannel : NoopBufferedChannel
{
public DiagnosticsBufferedChannel(BufferOptions options, bool observeConcurrency = false)
: base(options, observeConcurrency) =>
Listener = new ChannelListener<NoopEvent, NoopResponse>().Register(Options);

public ChannelListener<NoopEvent, NoopResponse> Listener { get; }

public override string ToString() => $@"{Listener}
Send Invocations: {SentBuffersCount:N0}
Observed Concurrency: {ObservedConcurrency:N0}
";
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using System.Threading;
using System.Threading.Tasks;

namespace Elastic.Channels;
namespace Elastic.Channels.Diagnostics;

/// <summary>
/// A NOOP implementation of <see cref="BufferedChannelBase{TEvent,TResponse}"/> that:
Expand Down
12 changes: 6 additions & 6 deletions src/Elastic.Channels/InboundBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ internal class InboundBuffer<TEvent> : IWriteTrackingBuffer, IDisposable

public int Count => Buffer.Count;
public TimeSpan? DurationSinceFirstWrite => DateTimeOffset.UtcNow - TimeOfFirstWrite;
public bool NoThresholdsHit => Count == 0 || (Count < _maxBufferSize && DurationSinceFirstWrite <= _forceFlushAfter);
public bool NoThresholdsHit => Count == 0
|| (Count < _maxBufferSize && DurationSinceFirstWrite <= _forceFlushAfter);

public InboundBuffer(int maxBufferSize, TimeSpan forceFlushAfter)
{
Expand Down Expand Up @@ -71,7 +72,7 @@ private TimeSpan Wait
/// <summary>
/// Call <see cref="ChannelReader{T}.WaitToReadAsync"/> with a timeout to force a flush to happen every
/// <see cref="_forceFlushAfter"/>. This tries to avoid allocation too many <see cref="CancellationTokenSource"/>'s
/// needlessly and reuses them if possible. If we know the buffer is empty we can wait indefinitely as well.
/// needlessly and reuses them if possible.
/// </summary>
public async Task<bool> WaitToReadAsync(ChannelReader<TEvent> reader)
{
Expand All @@ -83,11 +84,10 @@ public async Task<bool> WaitToReadAsync(ChannelReader<TEvent> reader)

try
{
//if we have nothing in the buffer wait indefinitely for messages
var w = Count == 0 ? TimeSpan.FromMilliseconds(-1) : Wait;
var w = Count == 0 ? _forceFlushAfter : Wait;
_breaker.CancelAfter(w);
var _ = await reader.WaitToReadAsync(_breaker.Token).ConfigureAwait(false);
_breaker.CancelAfter(_forceFlushAfter);
_breaker.CancelAfter(-1);
return true;
}
catch (Exception) when (_breaker.IsCancellationRequested)
Expand All @@ -98,7 +98,7 @@ public async Task<bool> WaitToReadAsync(ChannelReader<TEvent> reader)
}
catch (Exception)
{
_breaker.CancelAfter(_forceFlushAfter);
_breaker.CancelAfter(-1);
return true;
}
}
Expand Down
45 changes: 45 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/Diagnostics/ChannelListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Linq;
using System.Threading;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.Serialization;

namespace Elastic.Ingest.Elasticsearch.Diagnostics;

// ReSharper disable once UnusedType.Global
public class ElasticsearchChannelListener<TEvent> : ChannelListener<TEvent, BulkResponse>
{
private int _rejectedItems;
private string? _firstItemError;
private int _serverRejections;

public override bool PublishSuccess => base.PublishSuccess && string.IsNullOrEmpty(_firstItemError);

// ReSharper disable once UnusedMember.Global
public ElasticsearchChannelListener<TEvent> Register(ResponseItemsChannelOptionsBase<TEvent, BulkResponse, BulkResponseItem> options)
{
base.Register(options);

options.ServerRejectionCallback = r =>
{
Interlocked.Add(ref _rejectedItems, r.Count);
if (r.Count > 0)
{
var error = r.Select(e => e.Item2).FirstOrDefault(i=>i.Error != null);
if (error != null)
_firstItemError ??= error.Error?.ToString();
}
Interlocked.Increment(ref _serverRejections);
};
return this;
}

protected override string AdditionalData => $@"Server Rejected Calls: {_serverRejections:N0}
Server Rejected Items: {_rejectedItems:N0}
First Error: {_firstItemError}
";
}
1 change: 1 addition & 0 deletions tests/Elastic.Channels.Tests/BehaviorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Diagnostics;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
Expand Down