diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index d2b9aa4..dc495fa 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information using System; -using System.Collections.Generic; using System.Threading; namespace Elastic.Channels diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index efa1419..32efea1 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -126,6 +126,7 @@ private async Task ConsumeOutboundEvents(CountdownEvent? countdown) var taskList = new List(maxConsumers); while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) + // ReSharper disable once RemoveRedundantBraces { while (OutChannel.Reader.TryRead(out var buffer)) { @@ -151,11 +152,11 @@ private async Task ExportBuffer(CountdownEvent? countdown, IReadOnlyCollection 0; i++) { Options.BulkAttemptCallback?.Invoke(i, items.Count); - TResponse response = null!; + TResponse? response; try { response = await Send(items).ConfigureAwait(false); - Options.ResponseCallback(response, buffer); + Options.ResponseCallback?.Invoke(response, buffer); } catch (Exception e) { @@ -211,11 +212,17 @@ public virtual void Dispose() { _inThread.Dispose(); } - catch { } + catch + { + // ignored + } try { _outThread.Dispose(); } - catch { } + catch + { + // ignored + } } } diff --git a/src/Elastic.Channels/ChannelOptionsBase.cs b/src/Elastic.Channels/ChannelOptionsBase.cs index 4c7dc30..29f9c5d 100644 --- a/src/Elastic.Channels/ChannelOptionsBase.cs +++ b/src/Elastic.Channels/ChannelOptionsBase.cs @@ -21,7 +21,7 @@ public abstract class ChannelOptionsBase public Func WriteEvent { get; set; } = null!; /// - /// If is reached, 's will fail to be published to the channel. You can be notified of dropped + /// If is reached, 's will fail to be published to the channel. You can be notified of dropped /// events with this callback /// public Action? PublishRejectionCallback { get; set; } @@ -30,14 +30,14 @@ public abstract class ChannelOptionsBase public Action? BulkAttemptCallback { get; set; } - /// Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of + /// Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of public Action>? MaxRetriesExceededCallback { get; set; } - /// Subscribe to be notified of events that are retryable but did not store correctly within the number of configured + /// Subscribe to be notified of events that are retryable but did not store correctly within the number of configured public Action>? RetryCallBack { get; set; } /// A generic hook to be notified of any bulk request being initiated by - public Action ResponseCallback { get; set; } = (r, b) => { }; + public Action? ResponseCallback { get; set; } } } diff --git a/src/Elastic.Channels/OutboundBuffer.cs b/src/Elastic.Channels/OutboundBuffer.cs index 707b9ff..57f3a00 100644 --- a/src/Elastic.Channels/OutboundBuffer.cs +++ b/src/Elastic.Channels/OutboundBuffer.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; -using System.Collections.ObjectModel; namespace Elastic.Channels; diff --git a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs index 2a7e371..c7729e9 100644 --- a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs +++ b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs @@ -16,7 +16,7 @@ public abstract class ResponseItemsChannelOptionsBase -/// A specialized implementation of +/// A specialized implementation of /// This base class exist to help with cases where writing data in bulk to a receiver is capable of reporting back /// individual write failures. /// @@ -44,7 +44,7 @@ protected ResponseItemsBufferedChannelBase(TChannelOptions options) : base(optio /// /// A predicate indicating an event was fully rejected and should be reported to - /// + /// /// protected abstract bool RejectEvent((TEvent, TBulkResponseItem) @event); diff --git a/src/Elastic.Ingest.Apm/ApmChannel.cs b/src/Elastic.Ingest.Apm/ApmChannel.cs index 61fe779..18a5cfe 100644 --- a/src/Elastic.Ingest.Apm/ApmChannel.cs +++ b/src/Elastic.Ingest.Apm/ApmChannel.cs @@ -2,7 +2,6 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -52,7 +51,7 @@ public ApmChannel(ApmChannelOptions options) : base(options) { } protected override Task Send(HttpTransport transport, IReadOnlyCollection page) => transport.RequestAsync(HttpMethod.POST, "/intake/v2/events", PostData.StreamHandler(page, - (b, stream) => + (_, _) => { /* NOT USED */ }, @@ -87,7 +86,7 @@ private async Task WriteBufferToStreamAsync(IReadOnlyCollection b { var type = @event switch { - Transaction t => "transaction", + Transaction _ => "transaction", _ => "unknown" }; var dictionary = new Dictionary() { { type, @event } }; diff --git a/src/Elastic.Ingest.Apm/ApmChannelOptions.cs b/src/Elastic.Ingest.Apm/ApmChannelOptions.cs index 6bbd1ca..101427e 100644 --- a/src/Elastic.Ingest.Apm/ApmChannelOptions.cs +++ b/src/Elastic.Ingest.Apm/ApmChannelOptions.cs @@ -1,8 +1,7 @@ // Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System; -using Elastic.Channels; + using Elastic.Ingest.Apm.Model; using Elastic.Ingest.Transport; using Elastic.Transport; diff --git a/src/Elastic.Ingest.Apm/Helpers/Time.cs b/src/Elastic.Ingest.Apm/Helpers/Time.cs index c94ee2b..bbf51ea 100644 --- a/src/Elastic.Ingest.Apm/Helpers/Time.cs +++ b/src/Elastic.Ingest.Apm/Helpers/Time.cs @@ -27,10 +27,8 @@ public static class Epoch internal static long ToTimestamp(DateTime dateTimeToConvert) { if (dateTimeToConvert.Kind != DateTimeKind.Utc) - { throw new ArgumentException($"{nameof(dateTimeToConvert)}'s Kind should be UTC but instead its Kind is {dateTimeToConvert.Kind}" + $". {nameof(dateTimeToConvert)}'s value: {dateTimeToConvert}", nameof(dateTimeToConvert)); - } return RoundTimeValue((dateTimeToConvert - UnixEpochDateTime).TotalMilliseconds * 1000); } diff --git a/src/Elastic.Ingest.Apm/Model/Transaction.cs b/src/Elastic.Ingest.Apm/Model/Transaction.cs index 0ab3f75..9348705 100644 --- a/src/Elastic.Ingest.Apm/Model/Transaction.cs +++ b/src/Elastic.Ingest.Apm/Model/Transaction.cs @@ -42,12 +42,6 @@ public Transaction(string type, string id, string traceId, SpanCount spanCount, [JsonPropertyName("id")] public string Id { get; set; } - /// - /// A mark captures the timing of a significant event during the lifetime of a transaction. - /// Marks are organized into groups and can be set by the user or the agent. - /// - //public Marks? Marks { get; set; } - /// /// Generic designation of a transaction in the scope of a single service (eg: 'GET /// /users/:id') diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index b30a1fa..d1efcae 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -1,7 +1,7 @@ // Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System.Collections.Generic; + using Elastic.Ingest.Elasticsearch.Serialization; namespace Elastic.Ingest.Elasticsearch.DataStreams diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamName.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamName.cs index 6818b57..1b7171c 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamName.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamName.cs @@ -4,7 +4,7 @@ using System; using System.Linq; -namespace Elastic.Ingest.Elasticsearch +namespace Elastic.Ingest.Elasticsearch.DataStreams { public record DataStreamName { @@ -17,8 +17,8 @@ public record DataStreamName /// User-configurable arbitrary grouping public string Namespace { get; init; } - private static char[] BadCharacters = { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#' }; - private static string BadCharactersError = string.Join(", ", BadCharacters.Select(c => $"'{c}'").ToArray()); + private static readonly char[] BadCharacters = { '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#' }; + private static readonly string BadCharactersError = string.Join(", ", BadCharacters.Select(c => $"'{c}'").ToArray()); public DataStreamName(string type, string dataSet = "generic", string @namespace = "default") { diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index feddf13..c462dfc 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -42,7 +42,7 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) => protected override Task Send(HttpTransport transport, IReadOnlyCollection page) => transport.RequestAsync(HttpMethod.POST, "/_bulk", PostData.StreamHandler(page, - (b, stream) => + (_, _) => { /* NOT USED */ }, @@ -68,10 +68,8 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), if (Options.WriteEvent != null) await Options.WriteEvent(stream, ctx, @event).ConfigureAwait(false); else - { await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), ElasticsearchChannelStatics.SerializerOptions, ctx) .ConfigureAwait(false); - } if (indexHeader is UpdateOperation) await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderEnd, ctx).ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs index 946e25a..7cca8ff 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs @@ -27,7 +27,7 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { } /// /// Provide a per document DateTimeOffset to be used as the date passed as parameter 0 to /// - public Func TimestampLookup { get; set; } = null!; + public Func? TimestampLookup { get; set; } /// /// If the document provides an Id this allows you to set a per document `_id`. @@ -36,6 +36,6 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { } /// Read more about bulk operations here: /// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-request-body /// - public Func BulkOperationIdLookup { get; set; } = null!; + public Func? BulkOperationIdLookup { get; set; } } } diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs index c2ae4ac..9f6f1db 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs @@ -3,10 +3,8 @@ // See the LICENSE file in the project root for more information using System; using System.Collections.Generic; -using System.Collections.ObjectModel; using System.Text.Json; using System.Text.Json.Serialization; -using Elastic.Transport.Products.Elasticsearch; namespace Elastic.Ingest.Elasticsearch.Serialization { diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkResponseItem.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkResponseItem.cs index cf88e3a..a6f927a 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkResponseItem.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkResponseItem.cs @@ -49,13 +49,14 @@ public override BulkResponseItem Read(ref Utf8JsonReader reader, Type typeToConv } var r = status == 200 ? OkayBulkResponseItem - : new BulkResponseItem { Action = action!, Status = status, Error = error }; + : new BulkResponseItem { Action = action, Status = status, Error = error }; return r; } public override void Write(Utf8JsonWriter writer, BulkResponseItem value, JsonSerializerOptions options) { + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract if (value is null) { writer.WriteNullValue(); diff --git a/src/Elastic.Ingest.Transport/TransportChannelBase.cs b/src/Elastic.Ingest.Transport/TransportChannelBase.cs index 345e67b..b14b705 100644 --- a/src/Elastic.Ingest.Transport/TransportChannelBase.cs +++ b/src/Elastic.Ingest.Transport/TransportChannelBase.cs @@ -2,9 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System; using System.Collections.Generic; -using System.Threading; using System.Threading.Tasks; using Elastic.Channels; using Elastic.Transport; @@ -12,7 +10,7 @@ namespace Elastic.Ingest.Transport { public abstract class TransportChannelBase : - ResponseItemsBufferedChannelBase, IDisposable + ResponseItemsBufferedChannelBase where TChannelOptions : TransportChannelOptionsBase where TResponse : TransportResponse, new() diff --git a/tests/Elastic.Channels.Tests/BehaviorTests.cs b/tests/Elastic.Channels.Tests/BehaviorTests.cs index 9cc408e..d3f8ac3 100644 --- a/tests/Elastic.Channels.Tests/BehaviorTests.cs +++ b/tests/Elastic.Channels.Tests/BehaviorTests.cs @@ -35,7 +35,8 @@ [Fact] public async Task RespectsPagination() var written = 0; for (var i = 0; i < totalEvents; i++) { - if (await channel.WaitToWriteAsync(new NoopEvent())) + var e = new NoopEvent(); + if (await channel.WaitToWriteAsync(e)) written++; } channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(5)); @@ -44,7 +45,7 @@ [Fact] public async Task RespectsPagination() } /// - /// If we are feeding data slowly e.g smaller than + /// If we are feeding data slowly e.g smaller than /// we don't want this data equally distributed over multiple calls to export the data. /// Instead we want the smaller buffer to go out over a single export to the external system /// @@ -66,7 +67,8 @@ [Fact] public async Task MessagesAreSequentiallyDistributedOverWorkers() var written = 0; for (var i = 0; i < 100; i++) { - if (await channel.WaitToWriteAsync(new NoopEvent())) + var e = new NoopEvent(); + if (await channel.WaitToWriteAsync(e)) written++; } channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(1)); @@ -94,7 +96,8 @@ [Fact] public async Task ConcurrencyIsApplied() var written = 0; for (var i = 0; i < totalEvents; i++) { - if (await channel.WaitToWriteAsync(new NoopEvent())) + var e = new NoopEvent(); + if (await channel.WaitToWriteAsync(e)) written++; } channelOptions.BufferOptions.WaitHandle.Wait(TimeSpan.FromSeconds(5)); @@ -114,7 +117,7 @@ public class NoopIngestChannel : BufferedChannelBase _seenPages; protected override Task Send(IReadOnlyCollection page) @@ -128,7 +131,7 @@ public class DelayedNoopIngestChannel : NoopIngestChannel { public DelayedNoopIngestChannel(NoopChannelOptions options) : base(options) { } - private int _currentMax = 0; + private int _currentMax; public int MaxConcurrency { get; set; } protected override async Task Send(IReadOnlyCollection page) diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs index d72750f..f8e5e59 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using Elastic.Channels; using Elastic.Clients.Elasticsearch.IndexManagement; -using Elastic.Elasticsearch.Managed; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Transport; using FluentAssertions; diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs index 81bf247..236b8a6 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IndexIngestionTests.cs @@ -6,12 +6,8 @@ using System.Threading; using System.Threading.Tasks; using Elastic.Channels; -using Elastic.Clients.Elasticsearch; using Elastic.Clients.Elasticsearch.IndexManagement; -using Elastic.Elasticsearch.Managed; -using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; -using Elastic.Transport; using FluentAssertions; using Xunit; using Xunit.Abstractions; diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs index bbcdc57..b75ffa5 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IngestionCluster.cs @@ -19,7 +19,7 @@ public class IngestionCluster : XunitClusterBase public IngestionCluster() : base(new XunitClusterConfiguration("8.3.1") { StartingPortNumber = 9202 }) { } public ElasticsearchClient CreateClient(ITestOutputHelper output) => - this.GetOrAddClient(c => + this.GetOrAddClient(_ => { var hostName = (System.Diagnostics.Process.GetProcessesByName("mitmproxy").Any() ? "ipv4.fiddler" @@ -27,11 +27,14 @@ public ElasticsearchClient CreateClient(ITestOutputHelper output) => var nodes = NodesUris(hostName); var connectionPool = new StaticNodePool(nodes); var settings = new ElasticsearchClientSettings(connectionPool) - .Proxy(new Uri("http://ipv4.fiddler:8080"), (string)null, (string)null) + .Proxy(new Uri("http://ipv4.fiddler:8080"), null!, null!) .OnRequestCompleted(d => { try { output.WriteLine(d.DebugInformation);} - catch { } + catch + { + // ignored + } }) .EnableDebugMode(); return new ElasticsearchClient(settings); diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IntegrationTestBase.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IntegrationTestBase.cs index dbf32f1..9ccb446 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IntegrationTestBase.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/IntegrationTestBase.cs @@ -1,25 +1,16 @@ // Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System; -using System.Linq; using Elastic.Clients.Elasticsearch; -using Elastic.Elasticsearch.Xunit; using Elastic.Elasticsearch.Xunit.XunitPlumbing; -using Elastic.Transport; using Xunit.Abstractions; namespace Elastic.Ingest.Elasticsearch.IntegrationTests; public abstract class IntegrationTestBase : IClusterFixture { - private readonly ITestOutputHelper _output; - protected ElasticsearchClient Client { get; } - protected IntegrationTestBase(IngestionCluster cluster, ITestOutputHelper output) - { - _output = output; + protected IntegrationTestBase(IngestionCluster cluster, ITestOutputHelper output) => Client = cluster.CreateClient(output); - } } diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs index 7ea83aa..6e2afd6 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs @@ -53,16 +53,16 @@ public TestSession(HttpTransport transport) MaxConsumerBufferLifetime = TimeSpan.FromSeconds(10), WaitHandle = WaitHandle, MaxRetries = 3, - BackoffPeriod = times => TimeSpan.FromMilliseconds(1), + BackoffPeriod = _ => TimeSpan.FromMilliseconds(1), }; ChannelOptions = new IndexChannelOptions(transport) { BufferOptions = BufferOptions, - ServerRejectionCallback = (list) => Interlocked.Increment(ref _rejections), - BulkAttemptCallback = (c, a) => Interlocked.Increment(ref _requests), - ResponseCallback = (r, b) => Interlocked.Increment(ref _responses), - MaxRetriesExceededCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded), - RetryCallBack = (list) => Interlocked.Increment(ref _retries), + ServerRejectionCallback = (_) => Interlocked.Increment(ref _rejections), + BulkAttemptCallback = (_, _) => Interlocked.Increment(ref _requests), + ResponseCallback = (_, _) => Interlocked.Increment(ref _responses), + MaxRetriesExceededCallback = (_) => Interlocked.Increment(ref _maxRetriesExceeded), + RetryCallBack = (_) => Interlocked.Increment(ref _retries), ExceptionCallback= (e) => LastException = e }; Channel = new IndexChannel(ChannelOptions); @@ -103,7 +103,7 @@ public static TestSession CreateTestSession(HttpTransport