Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Elasticsearch.Net/Auditing/Auditable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ public Auditable(AuditEvent type, List<Audit> auditTrail, IDateTimeProvider date
_dateTimeProvider = dateTimeProvider;
var started = _dateTimeProvider.Now();

_audit = new Audit(type, started);
_audit.Node = node;
_audit = new Audit(type, started) { Node = node };
auditTrail.Add(_audit);
var diagnosticName = type.GetAuditDiagnosticEventName();
_activity = diagnosticName != null ? DiagnosticSource.Diagnose(diagnosticName, _audit) : null;
Expand All @@ -43,9 +42,11 @@ public string Path
set => _audit.Path = value;
}

public void Stop() => _audit.Ended = _dateTimeProvider.Now();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, do we want this to be called multiple times to override or not?

I think its fine as is.


public void Dispose()
{
_audit.Ended = _dateTimeProvider.Now();
_audit.Ended = _audit.Ended == default ? _dateTimeProvider.Now() : _audit.Ended;
_activity?.Dispose();
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public TResponse CallElasticsearch<TResponse>(RequestData requestData)
var response = _connection.Request<TResponse>(requestData);
d.EndState = response.ApiCall;
response.ApiCall.AuditTrail = AuditTrail;
audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response);
if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent;
return response;
Expand All @@ -186,6 +187,7 @@ public async Task<TResponse> CallElasticsearchAsync<TResponse>(RequestData reque
var response = await _connection.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);
d.EndState = response.ApiCall;
response.ApiCall.AuditTrail = AuditTrail;
audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCall, response);
if (!response.ApiCall.Success) audit.Event = requestData.OnFailureAuditEvent;
return response;
Expand Down Expand Up @@ -380,6 +382,7 @@ public void Ping(Node node)
{
var response = _connection.Request<VoidResponse>(pingData);
d.EndState = response;
audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response);
//ping should not silently accept bad but valid http responses
if (!response.Success)
Expand Down Expand Up @@ -408,6 +411,7 @@ public async Task PingAsync(Node node, CancellationToken cancellationToken)
{
var response = await _connection.RequestAsync<VoidResponse>(pingData, cancellationToken).ConfigureAwait(false);
d.EndState = response;
audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response);
//ping should not silently accept bad but valid http responses
if (!response.Success)
Expand Down Expand Up @@ -438,7 +442,7 @@ public void Sniff()
audit.Path = requestData.PathAndQuery;
var response = _connection.Request<SniffResponse>(requestData);
d.EndState = response;

audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response);
//sniff should not silently accept bad but valid http responses
if (!response.Success)
Expand Down Expand Up @@ -475,7 +479,7 @@ public async Task SniffAsync(CancellationToken cancellationToken)
audit.Path = requestData.PathAndQuery;
var response = await _connection.RequestAsync<SniffResponse>(requestData, cancellationToken).ConfigureAwait(false);
d.EndState = response;

audit.Stop();
ThrowBadAuthPipelineExceptionWhenNeeded(response);
//sniff should not silently accept bad but valid http responses
if (!response.Success)
Expand Down Expand Up @@ -564,8 +568,8 @@ private RequestData CreatePingRequestData(Node node)
EnableHttpPipelining = RequestConfiguration?.EnableHttpPipelining ?? _settings.HttpPipeliningEnabled,
ForceNode = RequestConfiguration?.ForceNode
};
IRequestParameters requestParameters = new RootNodeInfoRequestParameters();
requestParameters.RequestConfiguration = requestOverrides;

IRequestParameters requestParameters = new RootNodeInfoRequestParameters { RequestConfiguration = requestOverrides };

var data = new RequestData(HttpMethod.HEAD, string.Empty, null, _settings, requestParameters, _memoryStreamFactory) { Node = node };
return data;
Expand Down
73 changes: 73 additions & 0 deletions tests/Tests.Reproduce/GitHubIssue5363.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Elastic.Elasticsearch.Xunit.XunitPlumbing;
using Elasticsearch.Net;
using Elasticsearch.Net.Diagnostics;
using Xunit;

namespace Tests.Reproduce
{
public class GitHubIssue5363
{
internal class TestDiagnosticListener : IObserver<DiagnosticListener>, IDisposable
{
private ConcurrentBag<IDisposable> Disposables { get; } = new();

public Action<IApiCallDetails> OnEnded { get; }

public TestDiagnosticListener(Action<IApiCallDetails> onEnded) => OnEnded = onEnded;

public void OnError(Exception error) { }
public void OnCompleted() { }

public void OnNext(DiagnosticListener value) =>
TrySubscribe(DiagnosticSources.RequestPipeline.SourceName,
() => new RequestPipelineDiagnosticObserver(null, v => OnEnded(v.Value)), value);

private void TrySubscribe(string sourceName, Func<IObserver<KeyValuePair<string, object>>> listener, DiagnosticListener value)
{
if (value.Name != sourceName)
return;
var d = value.Subscribe(listener());

Disposables.Add(d);
}

public void Dispose()
{
foreach (var d in Disposables)
{
d.Dispose();
}
}
}

[U]
public async Task DiagnosticListener_AuditTrailIsValid()
{
using var listener = new TestDiagnosticListener(data =>
{
var auditTrailEvent = data.AuditTrail[0];

Assert.True(auditTrailEvent.Ended != default);
});

using var foo = DiagnosticListener.AllListeners.Subscribe(listener);

var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionConfiguration(connectionPool, new InMemoryConnection());

var client = new ElasticLowLevelClient(settings);
var person = new { Id = "1" };

await client.IndexAsync<BytesResponse>("test-index", PostData.Serializable(person));
}
}
}