Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,12 @@ public interface IBufferedChannel<in TEvent> : IDisposable
/// Waits for availability on the inbound channel before attempting to write each item in <paramref name="events"/>.
/// </summary>
/// <returns>A bool indicating if all writes werwase successful</returns>
async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
var allWritten = true;
foreach (var e in events)
{
var written = await WaitToWriteAsync(e, ctx).ConfigureAwait(false);
if (!written) allWritten = written;
}
return allWritten;
}
Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default);

/// <summary>
/// Tries to write many <paramref name="events"/> to the channel returning true if ALL messages were written succesfully
/// </summary>
bool TryWriteMany(IEnumerable<TEvent> events) =>
events.Select(e => TryWrite(e)).All(b => b);
bool TryWriteMany(IEnumerable<TEvent> events);
}

/// <summary>
Expand Down Expand Up @@ -122,8 +112,6 @@ await ConsumeInboundEvents(maxOut, BufferOptions.OutboundBufferMaxLifetime)
/// </summary>
protected abstract Task<TResponse> Export(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);



/// <summary>The channel options currently in use</summary>
public TChannelOptions Options { get; }

Expand Down Expand Up @@ -152,6 +140,23 @@ public override bool TryWrite(TEvent item)
return false;
}


/// <inheritdoc cref="IBufferedChannel{TEvent}.WaitToWriteManyAsync"/>
public async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
var allWritten = true;
foreach (var e in events)
{
var written = await WaitToWriteAsync(e, ctx).ConfigureAwait(false);
if (!written) allWritten = written;
}
return allWritten;
}

/// <inheritdoc cref="IBufferedChannel{TEvent}.TryWriteMany"/>
public bool TryWriteMany(IEnumerable<TEvent> events) =>
events.Select(e => TryWrite(e)).All(b => b);

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken ctx = default)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Channels/Elastic.Channels.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner.</Description>
<PackageTags>elastic, channels, buffer</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to APM using Elastic.Transport</Description>
<PackageTags>elastic, channels, apm, ingest</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport</Description>
<PackageTags>elastic, channels, elasticsearch, ingest</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
17 changes: 9 additions & 8 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch
{
Expand Down Expand Up @@ -46,7 +47,7 @@ protected override bool Retry(BulkResponse response)

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryEvent"/>
protected override bool RetryEvent((TEvent, BulkResponseItem) @event) =>
ElasticsearchChannelStatics.RetryStatusCodes.Contains(@event.Item2.Status);
RetryStatusCodes.Contains(@event.Item2.Status);

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RejectEvent"/>
protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
Expand All @@ -61,7 +62,7 @@ protected override Task<BulkResponse> Export(HttpTransport transport, IReadOnlyC
/* NOT USED */
},
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, ctx).ConfigureAwait(false); })
, ElasticsearchChannelStatics.RequestParams, ctx);
, RequestParams, ctx);

/// <summary>
/// Asks implementations to create a <see cref="BulkOperationHeader"/> based on the <paramref name="event"/> being exported.
Expand All @@ -75,23 +76,23 @@ private async Task WriteBufferToStreamAsync(IReadOnlyCollection<TEvent> b, Strea
if (@event == null) continue;

var indexHeader = CreateBulkOperationHeader(@event);
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), ElasticsearchChannelStatics.SerializerOptions, ctx)
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx)
.ConfigureAwait(false);
await stream.WriteAsync(ElasticsearchChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (indexHeader is UpdateOperation)
await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderStart, ctx).ConfigureAwait(false);
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (Options.WriteEvent != null)
await Options.WriteEvent(stream, ctx, @event).ConfigureAwait(false);
else
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), ElasticsearchChannelStatics.SerializerOptions, ctx)
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), SerializerOptions, ctx)
.ConfigureAwait(false);

if (indexHeader is UpdateOperation)
await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderEnd, ctx).ConfigureAwait(false);
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(ElasticsearchChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel for publishing events to distributed systems over HTTP through Elastic.Transport</Description>
<PackageTags>elastic, transport, ingest, search</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down