Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public enum ProductCheckStatus
NotChecked,
ValidProduct,
InvalidProduct,
UndeterminedProduct
UndeterminedProduct,
TransientFailure
}
}
63 changes: 45 additions & 18 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ public class RequestPipeline : IRequestPipeline
private const string ExpectedTagLine = "You Know, for Search";
private const string NoNodesAttemptedMessage = "No nodes were attempted, this can happen when a node predicate does not match any nodes";

public const string ProductCheckTransientErrorWarning =
"The client is unable to verify that the server is Elasticsearch due to an unsuccessful product check call. "
+ "Some functionality may not be compatible if the server is running an unsupported product.";

public const string UndeterminedProductWarning =
"The client is unable to verify that the server is Elasticsearch due security privileges on the server side.";
"The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. "
+ "Some functionality may not be compatible if the server is running an unsupported product.";

private static readonly Version MinVersion = new(6, 0, 0);
private static readonly Version Version7 = new(7, 0, 0);
Expand Down Expand Up @@ -255,7 +260,7 @@ public ElasticsearchClientException CreateClientException<TResponse>(
public void FirstPoolUsage(SemaphoreSlim semaphore)
{
// If sniffing has completed and the product check has run, we are done!
if (!FirstPoolUsageNeedsSniffing && _connectionPool.ProductCheckStatus != ProductCheckStatus.NotChecked)
if (!FirstPoolUsageNeedsSniffing && !RequiresProductCheck(_connectionPool.ProductCheckStatus))
return;

if (!semaphore.Wait(_settings.RequestTimeout.Add(_settings.RequestTimeout))) // Double the timeout to allow for product check delays
Expand All @@ -270,7 +275,7 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)

try
{
if (_connectionPool.ProductCheckStatus == ProductCheckStatus.NotChecked)
if (RequiresProductCheck(_connectionPool.ProductCheckStatus))
using (Audit(ProductCheckOnStartup))
{
var nodes = _connectionPool.Nodes.ToArray(); // Isolated copy of nodes for the product check
Expand All @@ -282,8 +287,9 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)
}
else
// We determine the product from the first node which successfully responds.
// If a node fails, we retry other available nodes until the request timeout is reached.
for (var i = 0;
i < nodes.Length && _connectionPool.ProductCheckStatus == ProductCheckStatus.NotChecked && !IsTakingTooLong;
i < nodes.Length && RequiresProductCheck(_connectionPool.ProductCheckStatus) && !IsTakingTooLong;
i++)
ProductCheck(nodes[i]);

Expand Down Expand Up @@ -311,7 +317,7 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)
public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken)
{
// If sniffing has completed and the product check has run, we are done!
if (!FirstPoolUsageNeedsSniffing && _connectionPool.ProductCheckStatus != ProductCheckStatus.NotChecked)
if (!FirstPoolUsageNeedsSniffing && !RequiresProductCheck(_connectionPool.ProductCheckStatus))
return;

// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
Expand All @@ -330,7 +336,7 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken

try
{
if (_connectionPool.ProductCheckStatus == ProductCheckStatus.NotChecked)
if (RequiresProductCheck(_connectionPool.ProductCheckStatus))
using (Audit(ProductCheckOnStartup))
{
var nodes = _connectionPool.Nodes.ToArray(); // Isolated copy of nodes for the product check
Expand All @@ -342,8 +348,9 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
}
else
// We determine the product from the first node which successfully responds.
// If a node fails, we retry other available nodes until the request timeout is reached.
for (var i = 0;
i < nodes.Length && _connectionPool.ProductCheckStatus == ProductCheckStatus.NotChecked && !IsTakingTooLong;
i < nodes.Length && RequiresProductCheck(_connectionPool.ProductCheckStatus) && !IsTakingTooLong;
i++)
await ProductCheckAsync(nodes[i], cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -595,21 +602,37 @@ public void ThrowNoNodesAttempted(RequestData requestData, List<PipelineExceptio
throw new UnexpectedElasticsearchClientException(clientException, seenExceptions) { Request = requestData, AuditTrail = AuditTrail };
}

private static bool RequiresProductCheck(ProductCheckStatus status) =>
status is ProductCheckStatus.NotChecked or ProductCheckStatus.TransientFailure;

private TResponse PostCallElasticsearch<TResponse>(RequestData requestData, TResponse response,
Diagnostic<RequestData, IApiCallDetails> diagnostic, Auditable audit
)
where TResponse : class, IElasticsearchResponse, new()
{
// Add additional warning to debug information if the product could not be determined and may not be Elasticsearch
if (_connectionPool.ProductCheckStatus == ProductCheckStatus.UndeterminedProduct && response.ApiCall is ApiCallDetails callDetails)
{
Debug.WriteLine(UndeterminedProductWarning);
callDetails.BuildDebugInformationPrefix = sb =>
if (response.ApiCall is ApiCallDetails callDetails)
switch (_connectionPool.ProductCheckStatus)
{
sb.AppendLine("# Warnings:");
sb.AppendLine($"- {UndeterminedProductWarning}");
};
}
// Add additional warning to debug information if the product could not be determined and may not be Elasticsearch
case ProductCheckStatus.UndeterminedProduct:
Debug.WriteLine(UndeterminedProductWarning);
callDetails.BuildDebugInformationPrefix = sb =>
{
sb.AppendLine("# Warnings:");
sb.AppendLine($"- {UndeterminedProductWarning}");
};
break;

// Add additional warning to debug information if the product could not be determined due to a transient error.
case ProductCheckStatus.TransientFailure:
Debug.WriteLine(ProductCheckTransientErrorWarning);
callDetails.BuildDebugInformationPrefix = sb =>
{
sb.AppendLine("# Warnings:");
sb.AppendLine($"- {ProductCheckTransientErrorWarning}");
};
break;
}

diagnostic.EndState = response.ApiCall;
response.ApiCall.AuditTrail = AuditTrail;
Expand Down Expand Up @@ -692,10 +715,14 @@ private bool ApplyProductCheckRules(RootResponse response)
return true;
}

// Any response besides a 200, 401 or 403 is considered a failure and the check is considered incomplete remaining in the NotChecked state.
// Any response besides a 200, 401 or 403 is considered a failure and the check is considered incomplete and marked as a likely transient failure.
// This means that the check will run on subsequent requests until we have a valid response to evaluate.
// By returning false, the failure to perform the product check will be included in the audit log.
if (!response.Success) return false;
if (!response.Success)
{
_connectionPool.ProductCheckStatus = ProductCheckStatus.TransientFailure;
return false;
}

// Start by assuming the product is valid
_connectionPool.ProductCheckStatus = ProductCheckStatus.ValidProduct;
Expand Down