Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
name: publish canary packages to feedz.io
if: github.event_name == 'push' && startswith(github.ref, 'refs/heads')

- run: ./build.sh generatereleasenotes -s true
- run: ./build.sh generatereleasenotes -s true --token ${{secrets.GITHUB_TOKEN}}
name: Generate release notes for tag
if: github.event_name == 'push' && startswith(github.ref, 'refs/tags')
- run: ./build.sh createreleaseongithub -s true --token ${{secrets.GITHUB_TOKEN}}
Expand Down
7 changes: 6 additions & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\..\build\keys\keypair.snk</AssemblyOriginatorKeyFile>

<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<PackageIcon>nuget-icon.png</PackageIcon>

<WarningsAsErrors>True</WarningsAsErrors>
<PackageReadmeFile>README.md</PackageReadmeFile>

<!-- Include .pdb in package -->
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
<!-- Generate documentation files for each package -->
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<DebugSymbols>true</DebugSymbols>
</PropertyGroup>

<ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

namespace Elastic.Channels
{
/// <summary>
/// Controls how data should be buffered in <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/> implementations
/// </summary>
public class BufferOptions
{
/// <summary>
Expand Down
61 changes: 51 additions & 10 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,35 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Elastic.Channels.Buffers;

namespace Elastic.Channels;

/// <summary> Represents a buffered channel implementation</summary>
/// <typeparam name="TEvent">The type of data to be written</typeparam>
public interface IBufferedChannel<in TEvent> : IDisposable
{
/// <summary>
/// Tries to write <paramref name="item"/> to the inbound channel.
/// <para>Returns immediately if successful or unsuccessful</para>
/// </summary>
/// <returns>A bool indicating if the write was successful</returns>
bool TryWrite(TEvent item);

/// <summary>
/// Waits for availability on the inbound channel before attempting to write <paramref name="item"/>.
/// </summary>
/// <returns>A bool indicating if the write was successful</returns>
Task<bool> WaitToWriteAsync(TEvent item, CancellationToken ctx = default);

/// <summary>
/// Waits for availability on the inbound channel before attempting to write each item in <paramref name="events"/>.
/// </summary>
/// <returns>A bool indicating if all writes werwase successful</returns>
async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
var allWritten = true;
Expand All @@ -27,14 +43,22 @@ async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationTo
}
return allWritten;
}
}

public abstract class BufferedChannelBase<TEvent, TResponse> : BufferedChannelBase<ChannelOptionsBase<TEvent, TResponse>, TEvent, TResponse>
where TResponse : class, new()
{
protected BufferedChannelBase(ChannelOptionsBase<TEvent, TResponse> options) : base(options) { }
/// <summary>
/// Tries to write many <paramref name="events"/> to the channel returning true if ALL messages were written succesfully
/// </summary>
public bool TryWriteMany(IEnumerable<TEvent> events) =>
events.Select(e => TryWrite(e)).All(b => b);
}

/// <summary>
/// The common base implementation of <see cref="IBufferedChannel{TEvent}"/> that all implementations inherit from.
/// <para>This sets up the <see cref="InChannel"/> and <see cref="OutChannel"/> and the implementation that coordinates moving
/// data from one to the other</para>
/// </summary>
/// <typeparam name="TChannelOptions">Concrete channel options implementation</typeparam>
/// <typeparam name="TEvent">The type of data we are looking to <see cref="Export"/></typeparam>
/// <typeparam name="TResponse">The type of responses we are expecting to get back from <see cref="Export"/></typeparam>
public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
: ChannelWriter<TEvent>, IBufferedChannel<TEvent>
where TChannelOptions : ChannelOptionsBase<TEvent, TResponse>
Expand All @@ -45,6 +69,7 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
private readonly SemaphoreSlim _throttleTasks;
private readonly CountdownEvent? _signal;

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options)
{
TokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -91,19 +116,31 @@ await ConsumeInboundEvents(maxOut, BufferOptions.OutboundBufferMaxLifetime)

}

/// <summary>
/// All subclasses of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/> need to at a minimum
/// implement this method to export buffered collection of <see cref="OutChannel"/>
/// </summary>
protected abstract Task<TResponse> Export(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);



/// <summary>The channel options currently in use</summary>
public TChannelOptions Options { get; }

private CancellationTokenSource TokenSource { get; }
protected Channel<IOutboundBuffer<TEvent>> OutChannel { get; }
protected Channel<TEvent> InChannel { get; }
protected BufferOptions BufferOptions => Options.BufferOptions;
private Channel<IOutboundBuffer<TEvent>> OutChannel { get; }
private Channel<TEvent> InChannel { get; }
private BufferOptions BufferOptions => Options.BufferOptions;

internal InboundBuffer<TEvent> InboundBuffer { get; }

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
public override ValueTask<bool> WaitToWriteAsync(CancellationToken ctx = default) => InChannel.Writer.WaitToWriteAsync(ctx);

/// <inheritdoc cref="ChannelWriter{T}.TryComplete"/>
public override bool TryComplete(Exception? error = null) => InChannel.Writer.TryComplete(error);

/// <inheritdoc cref="ChannelWriter{T}.TryWrite"/>
public override bool TryWrite(TEvent item)
{
if (InChannel.Writer.TryWrite(item))
Expand All @@ -115,6 +152,7 @@ public override bool TryWrite(TEvent item)
return false;
}

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
Expand All @@ -128,10 +166,12 @@ public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken
return false;
}

protected abstract Task<TResponse> Export(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);

private static readonly IReadOnlyCollection<TEvent> DefaultRetryBuffer = new TEvent[] { };

/// <summary>
/// Subclasses may override this to yield items from <typeparamref name="TResponse"/> that can be retried.
/// <para>The default implementation of this simply always returns an empty collection</para>
/// </summary>
protected virtual IReadOnlyCollection<TEvent> RetryBuffer(
TResponse response,
IReadOnlyCollection<TEvent> currentBuffer,
Expand Down Expand Up @@ -259,6 +299,7 @@ async Task<bool> AsyncSlowPath(IOutboundBuffer<TEvent> b)
: new ValueTask<bool>(AsyncSlowPath(buffer));
}

/// <inheritdoc cref="IDisposable.Dispose"/>
public virtual void Dispose()
{
InboundBuffer.Dispose();
Expand Down
15 changes: 0 additions & 15 deletions src/Elastic.Channels/BufferedChannelExtensions.cs

This file was deleted.

4 changes: 4 additions & 0 deletions src/Elastic.Channels/Buffers/IWriteTrackingBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ namespace Elastic.Channels.Buffers;
/// </summary>
public interface IWriteTrackingBuffer
{
/// <summary> The current size of the buffer </summary>
int Count { get; }
/// <summary>
/// The duration since the first write
/// </summary>
TimeSpan? DurationSinceFirstWrite { get; }
}
5 changes: 5 additions & 0 deletions src/Elastic.Channels/Buffers/OutboundBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@

namespace Elastic.Channels.Buffers;

/// <summary>
/// The buffer to be exported over <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>
/// </summary>
/// <remarks>Due to change as we move this over to use ArrayPool</remarks>
public interface IOutboundBuffer<out TEvent> : IWriteTrackingBuffer
{
/// <inheritdoc cref="IOutboundBuffer{TEvent}"/>
public IReadOnlyCollection<TEvent> Items { get; }
}

Expand Down
6 changes: 5 additions & 1 deletion src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ namespace Elastic.Channels
/// <typeparam name="TResponse"></typeparam>
public abstract class ChannelOptionsBase<TEvent, TResponse>
{
/// <inheritdoc cref="BufferOptions"/>
public BufferOptions BufferOptions { get; set; } = new ();

public Func<Stream, CancellationToken, TEvent, Task> WriteEvent { get; set; } = null!;
/// <summary>
/// Optionally provides a custom write implementation to a channel. Concrete channel implementations are not required to adhere to this config
/// </summary>
public Func<Stream, CancellationToken, TEvent, Task>? WriteEvent { get; set; } = null;

/// <summary> Called if the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> throws. </summary>
public Action<Exception>? ExportExceptionCallback { get; set; }
Expand Down
19 changes: 19 additions & 0 deletions src/Elastic.Channels/Diagnostics/ChannelListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,25 @@

namespace Elastic.Channels.Diagnostics;

/// <summary>
/// A very rudimentary diagnostics object tracking various important metrics to provide insights into the
/// machinery of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>.
/// <para>This will be soon be replaced by actual metrics</para>
/// </summary>
public class ChannelListener<TEvent, TResponse>
{
private readonly string? _name;
private int _exportedBuffers;

/// <summary>
/// Keeps track of the first observed exception to calls to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>
/// </summary>
public Exception? ObservedException { get; private set; }

/// <summary> Indicates if the overall publishing was successful</summary>
public virtual bool PublishSuccess => ObservedException == null && _exportedBuffers > 0 && _maxRetriesExceeded == 0 && _items > 0;

/// <inheritdoc cref="ChannelListener{TEvent,TResponse}"/>
public ChannelListener(string? name = null) => _name = name;

private long _responses;
Expand All @@ -31,6 +41,9 @@ public class ChannelListener<TEvent, TResponse>
private bool _outboundChannelExited;

// ReSharper disable once MemberCanBeProtected.Global
/// <summary>
/// Registers callbacks on <paramref name="options"/> to keep track metrics.
/// </summary>
public ChannelListener<TEvent, TResponse> Register(ChannelOptionsBase<TEvent, TResponse> options)
{
options.BufferOptions.ExportBufferCallback = () => Interlocked.Increment(ref _exportedBuffers);
Expand All @@ -54,8 +67,14 @@ public ChannelListener<TEvent, TResponse> Register(ChannelOptionsBase<TEvent, TR
return this;
}

/// <summary>
/// Allows subclasses to include more data in the <see cref="ToString"/> implementation before the exception is printed
/// </summary>
protected virtual string AdditionalData => string.Empty;

/// <summary>
/// Provides a debug message to give insights to the machinery of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
/// </summary>
public override string ToString() => $@"{(!PublishSuccess ? "Failed" : "Successful")} publish over channel: {_name ?? nameof(ChannelListener<TEvent, TResponse>)}.
Exported Buffers: {_exportedBuffers:N0}
Exported Items: {_items:N0}
Expand Down
12 changes: 9 additions & 3 deletions src/Elastic.Channels/Diagnostics/DiagnosticsBufferedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
namespace Elastic.Channels.Diagnostics;

/// <summary>
/// A NOOP implementation of <see cref="BufferedChannelBase{TEvent,TResponse}"/> that:
/// <para> -tracks the number of times <see cref="Send"/> is invoked under <see cref="SentBuffersCount"/> </para>
/// <para> -observes the maximum concurrent calls to <see cref="Send"/> under <see cref="ObservedConcurrency"/> </para>
/// A NOOP implementation of <see cref="IBufferedChannel{TEvent}"/> that:
/// <para> -tracks the number of times <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> is invoked under <see cref="NoopBufferedChannel.ExportedBuffers"/> </para>
/// <para> -observes the maximum concurrent calls to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> under <see cref="NoopBufferedChannel.ObservedConcurrency"/> </para>
/// </summary>
public class DiagnosticsBufferedChannel : NoopBufferedChannel
{
private readonly string? _name;

/// <inheritdoc cref="DiagnosticsBufferedChannel"/>
public DiagnosticsBufferedChannel(BufferOptions options, bool observeConcurrency = false, string? name = null)
: base(options, observeConcurrency)
{
_name = name;
Listener = new ChannelListener<NoopEvent, NoopResponse>(_name).Register(Options);
}

/// <inheritdoc cref="ChannelListener{TEvent,TResponse}"/>
// ReSharper disable once MemberCanBePrivate.Global
public ChannelListener<NoopEvent, NoopResponse> Listener { get; }

/// <summary>
/// Provides a debug message to give insights to the machinery of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
/// </summary>
public override string ToString() => $@"------------------------------------------
{Listener}

Expand Down
24 changes: 17 additions & 7 deletions src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,47 @@
namespace Elastic.Channels.Diagnostics;

/// <summary>
/// A NOOP implementation of <see cref="BufferedChannelBase{TEvent,TResponse}"/> that:
/// A NOOP implementation of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/> that:
/// <para> -tracks the number of times <see cref="Export"/> is invoked under <see cref="ExportedBuffers"/> </para>
/// <para> -observes the maximum concurrent calls to <see cref="Export"/> under <see cref="ObservedConcurrency"/> </para>
/// </summary>
public class NoopBufferedChannel
: BufferedChannelBase<NoopBufferedChannel.NoopChannelOptions, NoopBufferedChannel.NoopEvent, NoopBufferedChannel.NoopResponse>
{
/// <summary> Empty event for use with <see cref="NoopBufferedChannel"/> </summary>
public class NoopEvent { }

/// <summary> Empty response for use with <see cref="NoopBufferedChannel"/> </summary>
public class NoopResponse { }

/// <summary> Provides options how the <see cref="NoopBufferedChannel"/> should behave </summary>
public class NoopChannelOptions : ChannelOptionsBase<NoopEvent, NoopResponse>
{
public bool ObserverConcurrency { get; set; }
/// <summary> If set (defaults:false) will track the max observed concurrency to <see cref="NoopBufferedChannel.Export"/></summary>
public bool TrackConcurrency { get; set; }
}

/// <inheritdoc cref="NoopBufferedChannel"/>
public NoopBufferedChannel(BufferOptions options, bool observeConcurrency = false) : base(new NoopChannelOptions
{
BufferOptions = options,
ObserverConcurrency = observeConcurrency
BufferOptions = options, TrackConcurrency = observeConcurrency
}) { }

private long _exportedBuffers;
/// <summary> Returns the number of times <see cref="Export"/> was called</summary>
public long ExportedBuffers => _exportedBuffers;

private int _currentMax;
private long _exportedBuffers;

/// <summary> The maximum observed concurrency to calls to <see cref="Export"/>, requires <see cref="NoopChannelOptions.TrackConcurrency"/> to be set</summary>
public int ObservedConcurrency { get; private set; }

private int _currentMax;

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>
protected override async Task<NoopResponse> Export(IReadOnlyCollection<NoopEvent> buffer, CancellationToken ctx = default)
{
Interlocked.Increment(ref _exportedBuffers);
if (!Options.ObserverConcurrency) return new NoopResponse();
if (!Options.TrackConcurrency) return new NoopResponse();

var max = Interlocked.Increment(ref _currentMax);
await Task.Delay(TimeSpan.FromMilliseconds(1), ctx).ConfigureAwait(false);
Expand Down
Loading