From 251715f3eb2f66fd97449f95066848e1dabc8de4 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Tue, 16 Mar 2021 15:12:26 +0000 Subject: [PATCH] Add explicit stop method to Auditable --- src/Elasticsearch.Net/Auditing/Auditable.cs | 7 +- .../Transport/Pipeline/RequestPipeline.cs | 12 ++- tests/Tests.Reproduce/GitHubIssue5363.cs | 73 +++++++++++++++++++ 3 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 tests/Tests.Reproduce/GitHubIssue5363.cs diff --git a/src/Elasticsearch.Net/Auditing/Auditable.cs b/src/Elasticsearch.Net/Auditing/Auditable.cs index f9cabbf9230..2f1f107c6d9 100644 --- a/src/Elasticsearch.Net/Auditing/Auditable.cs +++ b/src/Elasticsearch.Net/Auditing/Auditable.cs @@ -21,8 +21,7 @@ public Auditable(AuditEvent type, List auditTrail, IDateTimeProvider date _dateTimeProvider = dateTimeProvider; var started = _dateTimeProvider.Now(); - _audit = new Audit(type, started); - _audit.Node = node; + _audit = new Audit(type, started) { Node = node }; auditTrail.Add(_audit); var diagnosticName = type.GetAuditDiagnosticEventName(); _activity = diagnosticName != null ? DiagnosticSource.Diagnose(diagnosticName, _audit) : null; @@ -43,9 +42,11 @@ public string Path set => _audit.Path = value; } + public void Stop() => _audit.Ended = _dateTimeProvider.Now(); + public void Dispose() { - _audit.Ended = _dateTimeProvider.Now(); + _audit.Ended = _audit.Ended == default ? _dateTimeProvider.Now() : _audit.Ended; _activity?.Dispose(); } } diff --git a/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs b/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs index 22461113b0f..0a421ae9f76 100644 --- a/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs +++ b/src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs @@ -161,6 +161,7 @@ public TResponse CallElasticsearch(RequestData requestData) var response = _connection.Request(requestData); d.EndState = response.ApiCall; response.ApiCall.AuditTrail = AuditTrail; + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response); if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent; return response; @@ -186,6 +187,7 @@ public async Task CallElasticsearchAsync(RequestData reque var response = await _connection.RequestAsync(requestData, cancellationToken).ConfigureAwait(false); d.EndState = response.ApiCall; response.ApiCall.AuditTrail = AuditTrail; + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response); if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent; return response; @@ -380,6 +382,7 @@ public void Ping(Node node) { var response = _connection.Request(pingData); d.EndState = response; + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response); //ping should not silently accept bad but valid http responses if (!response.Success) @@ -408,6 +411,7 @@ public async Task PingAsync(Node node, CancellationToken cancellationToken) { var response = await _connection.RequestAsync(pingData, cancellationToken).ConfigureAwait(false); d.EndState = response; + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response); //ping should not silently accept bad but valid http responses if (!response.Success) @@ -438,7 +442,7 @@ public void Sniff() audit.Path = requestData.PathAndQuery; var response = _connection.Request(requestData); d.EndState = response; - + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response); //sniff should not silently accept bad but valid http responses if (!response.Success) @@ -475,7 +479,7 @@ public async Task SniffAsync(CancellationToken cancellationToken) audit.Path = requestData.PathAndQuery; var response = await _connection.RequestAsync(requestData, cancellationToken).ConfigureAwait(false); d.EndState = response; - + audit.Stop(); ThrowBadAuthPipelineExceptionWhenNeeded(response); //sniff should not silently accept bad but valid http responses if (!response.Success) @@ -564,8 +568,8 @@ private RequestData CreatePingRequestData(Node node) EnableHttpPipelining = RequestConfiguration?.EnableHttpPipelining ?? _settings.HttpPipeliningEnabled, ForceNode = RequestConfiguration?.ForceNode }; - IRequestParameters requestParameters = new RootNodeInfoRequestParameters(); - requestParameters.RequestConfiguration = requestOverrides; + + IRequestParameters requestParameters = new RootNodeInfoRequestParameters { RequestConfiguration = requestOverrides }; var data = new RequestData(HttpMethod.HEAD, string.Empty, null, _settings, requestParameters, _memoryStreamFactory) { Node = node }; return data; diff --git a/tests/Tests.Reproduce/GitHubIssue5363.cs b/tests/Tests.Reproduce/GitHubIssue5363.cs new file mode 100644 index 00000000000..8375c9e0d19 --- /dev/null +++ b/tests/Tests.Reproduce/GitHubIssue5363.cs @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using Elasticsearch.Net; +using Elasticsearch.Net.Diagnostics; +using Xunit; + +namespace Tests.Reproduce +{ + public class GitHubIssue5363 + { + internal class TestDiagnosticListener : IObserver, IDisposable + { + private ConcurrentBag Disposables { get; } = new(); + + public Action OnEnded { get; } + + public TestDiagnosticListener(Action onEnded) => OnEnded = onEnded; + + public void OnError(Exception error) { } + public void OnCompleted() { } + + public void OnNext(DiagnosticListener value) => + TrySubscribe(DiagnosticSources.RequestPipeline.SourceName, + () => new RequestPipelineDiagnosticObserver(null, v => OnEnded(v.Value)), value); + + private void TrySubscribe(string sourceName, Func>> listener, DiagnosticListener value) + { + if (value.Name != sourceName) + return; + var d = value.Subscribe(listener()); + + Disposables.Add(d); + } + + public void Dispose() + { + foreach (var d in Disposables) + { + d.Dispose(); + } + } + } + + [U] + public async Task DiagnosticListener_AuditTrailIsValid() + { + using var listener = new TestDiagnosticListener(data => + { + var auditTrailEvent = data.AuditTrail[0]; + + Assert.True(auditTrailEvent.Ended != default); + }); + + using var foo = DiagnosticListener.AllListeners.Subscribe(listener); + + var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + var settings = new ConnectionConfiguration(connectionPool, new InMemoryConnection()); + + var client = new ElasticLowLevelClient(settings); + var person = new { Id = "1" }; + + await client.IndexAsync("test-index", PostData.Serializable(person)); + } + } +}