Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Threading;

namespace Elastic.Channels
Expand Down
15 changes: 11 additions & 4 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private async Task ConsumeOutboundEvents(CountdownEvent? countdown)
var taskList = new List<Task>(maxConsumers);

while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
// ReSharper disable once RemoveRedundantBraces
{
while (OutChannel.Reader.TryRead(out var buffer))
{
Expand All @@ -151,11 +152,11 @@ private async Task ExportBuffer(CountdownEvent? countdown, IReadOnlyCollection<T
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
{
Options.BulkAttemptCallback?.Invoke(i, items.Count);
TResponse response = null!;
TResponse? response;
try
{
response = await Send(items).ConfigureAwait(false);
Options.ResponseCallback(response, buffer);
Options.ResponseCallback?.Invoke(response, buffer);
}
catch (Exception e)
{
Expand Down Expand Up @@ -211,11 +212,17 @@ public virtual void Dispose()
{
_inThread.Dispose();
}
catch { }
catch
{
// ignored
}
try
{
_outThread.Dispose();
}
catch { }
catch
{
// ignored
}
}
}
8 changes: 4 additions & 4 deletions src/Elastic.Channels/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class ChannelOptionsBase<TEvent, TResponse>
public Func<Stream, CancellationToken, TEvent, Task> WriteEvent { get; set; } = null!;

/// <summary>
/// If <see cref="BufferOptions.MaxInFlightMessages"/> is reached, <see cref="TEvent"/>'s will fail to be published to the channel. You can be notified of dropped
/// If <see cref="Elastic.Channels.BufferOptions.MaxInFlightMessages"/> is reached, <see cref="TEvent"/>'s will fail to be published to the channel. You can be notified of dropped
/// events with this callback
/// </summary>
public Action<TEvent>? PublishRejectionCallback { get; set; }
Expand All @@ -30,14 +30,14 @@ public abstract class ChannelOptionsBase<TEvent, TResponse>

public Action<int, int>? BulkAttemptCallback { get; set; }

/// <summary> Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of <see cref="MaxRetries"/></summary>
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of <see cref="Elastic.Channels.BufferOptions.MaxRetries"/></summary>
public Action<IReadOnlyCollection<TEvent>>? MaxRetriesExceededCallback { get; set; }

/// <summary> Subscribe to be notified of events that are retryable but did not store correctly within the number of configured <see cref="MaxRetries"/></summary>
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly within the number of configured <see cref="Elastic.Channels.BufferOptions.MaxRetries"/></summary>
public Action<IReadOnlyCollection<TEvent>>? RetryCallBack { get; set; }

/// <summary> A generic hook to be notified of any bulk request being initiated by <see cref="InboundBuffer{TEvent}"/> </summary>
public Action<TResponse, IWriteTrackingBuffer> ResponseCallback { get; set; } = (r, b) => { };
public Action<TResponse, IWriteTrackingBuffer>? ResponseCallback { get; set; }
}

}
1 change: 0 additions & 1 deletion src/Elastic.Channels/OutboundBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;

namespace Elastic.Channels;

Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class ResponseItemsChannelOptionsBase<TEvent, TResponse, TBulkRe
}

/// <summary>
/// A specialized implementation of <see cref="BufferedChannelBase{TChannelOptions,TBuffer,TEvent,TResponse}"/>
/// A specialized implementation of <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
/// <para>This base class exist to help with cases where writing data in bulk to a receiver is capable of reporting back
/// individual write failures.
/// </para>
Expand Down Expand Up @@ -44,7 +44,7 @@ protected ResponseItemsBufferedChannelBase(TChannelOptions options) : base(optio

/// <summary>
/// A predicate indicating an event was fully rejected and should be reported to
/// <see cref="ResponseItemsChannelOptionsBase{TEvent,TBuffer,TResponse,TBulkResponseItem}.ServerRejectionCallback"/>
/// <see cref="ResponseItemsChannelOptionsBase{TEvent,TResponse,TBulkResponseItem}.ServerRejectionCallback"/>
/// </summary>
protected abstract bool RejectEvent((TEvent, TBulkResponseItem) @event);

Expand Down
5 changes: 2 additions & 3 deletions src/Elastic.Ingest.Apm/ApmChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -52,7 +51,7 @@ public ApmChannel(ApmChannelOptions options) : base(options) { }
protected override Task<EventIntakeResponse> Send(HttpTransport transport, IReadOnlyCollection<IIntakeObject> page) =>
transport.RequestAsync<EventIntakeResponse>(HttpMethod.POST, "/intake/v2/events",
PostData.StreamHandler(page,
(b, stream) =>
(_, _) =>
{
/* NOT USED */
},
Expand Down Expand Up @@ -87,7 +86,7 @@ private async Task WriteBufferToStreamAsync(IReadOnlyCollection<IIntakeObject> b
{
var type = @event switch
{
Transaction t => "transaction",
Transaction _ => "transaction",
_ => "unknown"
};
var dictionary = new Dictionary<string, object>() { { type, @event } };
Expand Down
3 changes: 1 addition & 2 deletions src/Elastic.Ingest.Apm/ApmChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using Elastic.Channels;

using Elastic.Ingest.Apm.Model;
using Elastic.Ingest.Transport;
using Elastic.Transport;
Expand Down
2 changes: 0 additions & 2 deletions src/Elastic.Ingest.Apm/Helpers/Time.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ public static class Epoch
internal static long ToTimestamp(DateTime dateTimeToConvert)
{
if (dateTimeToConvert.Kind != DateTimeKind.Utc)
{
throw new ArgumentException($"{nameof(dateTimeToConvert)}'s Kind should be UTC but instead its Kind is {dateTimeToConvert.Kind}" +
$". {nameof(dateTimeToConvert)}'s value: {dateTimeToConvert}", nameof(dateTimeToConvert));
}

return RoundTimeValue((dateTimeToConvert - UnixEpochDateTime).TotalMilliseconds * 1000);
}
Expand Down
6 changes: 0 additions & 6 deletions src/Elastic.Ingest.Apm/Model/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ public Transaction(string type, string id, string traceId, SpanCount spanCount,
[JsonPropertyName("id")]
public string Id { get; set; }

/// <summary>
/// A mark captures the timing of a significant event during the lifetime of a transaction.
/// Marks are organized into groups and can be set by the user or the agent.
/// </summary>
//public Marks? Marks { get; set; }

/// <summary>
/// Generic designation of a transaction in the scope of a single service (eg: 'GET
/// /users/:id')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Collections.Generic;

using Elastic.Ingest.Elasticsearch.Serialization;

namespace Elastic.Ingest.Elasticsearch.DataStreams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System;
using System.Linq;

namespace Elastic.Ingest.Elasticsearch
namespace Elastic.Ingest.Elasticsearch.DataStreams
{
public record DataStreamName
{
Expand All @@ -17,8 +17,8 @@ public record DataStreamName
/// <summary> User-configurable arbitrary grouping</summary>
public string Namespace { get; init; }

private static char[] BadCharacters = { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#' };
private static string BadCharactersError = string.Join(", ", BadCharacters.Select(c => $"'{c}'").ToArray());
private static readonly char[] BadCharacters = { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#' };
private static readonly string BadCharactersError = string.Join(", ", BadCharacters.Select(c => $"'{c}'").ToArray());

public DataStreamName(string type, string dataSet = "generic", string @namespace = "default")
{
Expand Down
4 changes: 1 addition & 3 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
protected override Task<BulkResponse> Send(HttpTransport transport, IReadOnlyCollection<TEvent> page) =>
transport.RequestAsync<BulkResponse>(HttpMethod.POST, "/_bulk",
PostData.StreamHandler(page,
(b, stream) =>
(_, _) =>
{
/* NOT USED */
},
Expand All @@ -68,10 +68,8 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(),
if (Options.WriteEvent != null)
await Options.WriteEvent(stream, ctx, @event).ConfigureAwait(false);
else
{
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), ElasticsearchChannelStatics.SerializerOptions, ctx)
.ConfigureAwait(false);
}

if (indexHeader is UpdateOperation)
await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderEnd, ctx).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { }
/// <summary>
/// Provide a per document <c>DateTimeOffset</c> to be used as the date passed as parameter 0 to <see cref="IndexFormat"/>
/// </summary>
public Func<TEvent, DateTimeOffset?> TimestampLookup { get; set; } = null!;
public Func<TEvent, DateTimeOffset?>? TimestampLookup { get; set; }

/// <summary>
/// If the document provides an Id this allows you to set a per document `_id`.
Expand All @@ -36,6 +36,6 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { }
/// <para>Read more about bulk operations here:</para>
/// <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-request-body</para>
/// </summary>
public Func<TEvent, string> BulkOperationIdLookup { get; set; } = null!;
public Func<TEvent, string>? BulkOperationIdLookup { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Text.Json;
using System.Text.Json.Serialization;
using Elastic.Transport.Products.Elasticsearch;

namespace Elastic.Ingest.Elasticsearch.Serialization
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ public override BulkResponseItem Read(ref Utf8JsonReader reader, Type typeToConv
}
var r = status == 200
? OkayBulkResponseItem
: new BulkResponseItem { Action = action!, Status = status, Error = error };
: new BulkResponseItem { Action = action, Status = status, Error = error };

return r;
}

public override void Write(Utf8JsonWriter writer, BulkResponseItem value, JsonSerializerOptions options)
{
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (value is null)
{
writer.WriteNullValue();
Expand Down
4 changes: 1 addition & 3 deletions src/Elastic.Ingest.Transport/TransportChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Transport;

namespace Elastic.Ingest.Transport
{
public abstract class TransportChannelBase<TChannelOptions, TEvent, TResponse, TBulkResponseItem> :
ResponseItemsBufferedChannelBase<TChannelOptions, TEvent, TResponse, TBulkResponseItem>, IDisposable
ResponseItemsBufferedChannelBase<TChannelOptions, TEvent, TResponse, TBulkResponseItem>
where TChannelOptions : TransportChannelOptionsBase<TEvent, TResponse, TBulkResponseItem>
where TResponse : TransportResponse, new()

Expand Down
15 changes: 9 additions & 6 deletions tests/Elastic.Channels.Tests/BehaviorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ [Fact] public async Task RespectsPagination()
var written = 0;
for (var i = 0; i < totalEvents; i++)
{
if (await channel.WaitToWriteAsync(new NoopEvent()))
var e = new NoopEvent();
if (await channel.WaitToWriteAsync(e))
written++;
}
channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(5));
Expand All @@ -44,7 +45,7 @@ [Fact] public async Task RespectsPagination()
}

/// <summary>
/// If we are feeding data slowly e.g smaller than <see cref="BufferOptions{TEvent}.MaxConsumerBufferSize"/>
/// If we are feeding data slowly e.g smaller than <see cref="BufferOptions.MaxConsumerBufferSize"/>
/// we don't want this data equally distributed over multiple calls to export the data.
/// Instead we want the smaller buffer to go out over a single export to the external system
/// </summary>
Expand All @@ -66,7 +67,8 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers()
var written = 0;
for (var i = 0; i < 100; i++)
{
if (await channel.WaitToWriteAsync(new NoopEvent()))
var e = new NoopEvent();
if (await channel.WaitToWriteAsync(e))
written++;
}
channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -94,7 +96,8 @@ [Fact] public async Task ConcurrencyIsApplied()
var written = 0;
for (var i = 0; i < totalEvents; i++)
{
if (await channel.WaitToWriteAsync(new NoopEvent()))
var e = new NoopEvent();
if (await channel.WaitToWriteAsync(e))
written++;
}
channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(5));
Expand All @@ -114,7 +117,7 @@ public class NoopIngestChannel : BufferedChannelBase<NoopChannelOptions, NoopEve
{
public NoopIngestChannel(NoopChannelOptions options) : base(options) { }

private long _seenPages = 0;
private long _seenPages;
public long SeenPages => _seenPages;

protected override Task<NoopResponse> Send(IReadOnlyCollection<NoopEvent> page)
Expand All @@ -128,7 +131,7 @@ public class DelayedNoopIngestChannel : NoopIngestChannel
{
public DelayedNoopIngestChannel(NoopChannelOptions options) : base(options) { }

private int _currentMax = 0;
private int _currentMax;
public int MaxConcurrency { get; set; }

protected override async Task<NoopResponse> Send(IReadOnlyCollection<NoopEvent> page)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Elasticsearch.Managed;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Transport;
using FluentAssertions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Elasticsearch.Managed;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Transport;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ public class IngestionCluster : XunitClusterBase
public IngestionCluster() : base(new XunitClusterConfiguration("8.3.1") { StartingPortNumber = 9202 }) { }

public ElasticsearchClient CreateClient(ITestOutputHelper output) =>
this.GetOrAddClient(c =>
this.GetOrAddClient(_ =>
{
var hostName = (System.Diagnostics.Process.GetProcessesByName("mitmproxy").Any()
? "ipv4.fiddler"
: "localhost");
var nodes = NodesUris(hostName);
var connectionPool = new StaticNodePool(nodes);
var settings = new ElasticsearchClientSettings(connectionPool)
.Proxy(new Uri("http://ipv4.fiddler:8080"), (string)null, (string)null)
.Proxy(new Uri("http://ipv4.fiddler:8080"), null!, null!)
.OnRequestCompleted(d =>
{
try { output.WriteLine(d.DebugInformation);}
catch { }
catch
{
// ignored
}
})
.EnableDebugMode();
return new ElasticsearchClient(settings);
Expand Down
Loading