Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public SingleNodeConnectionPool(Uri uri, IDateTimeProvider dateTimeProvider = nu
/// <inheritdoc />
public IReadOnlyCollection<Node> Nodes { get; }

/// <inheritdoc />
public ProductCheckStatus ProductCheckStatus { get; set; } = ProductCheckStatus.NotChecked;

/// <inheritdoc />
public bool SniffedOnStartup
{
get => true;
set { }
}

/// <inheritdoc />
public ProductCheckStatus ProductCheckStatus { get; set; }

/// <inheritdoc />
public bool SupportsPinging => false;

Expand Down
52 changes: 25 additions & 27 deletions src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public StaticConnectionPool(IEnumerable<Uri> uris, bool randomize = true, IDateT
: this(uris.Select(uri => new Node(uri)), randomize, dateTimeProvider) { }

public StaticConnectionPool(IEnumerable<Node> nodes, bool randomize = true, IDateTimeProvider dateTimeProvider = null)
: this(nodes, randomize, randomizeSeed: null, dateTimeProvider) { }
: this(nodes, randomize, null, dateTimeProvider) { }

protected StaticConnectionPool(IEnumerable<Node> nodes, bool randomize, int? randomizeSeed = null, IDateTimeProvider dateTimeProvider = null)
{
Expand All @@ -39,30 +39,6 @@ protected StaticConnectionPool(IEnumerable<Node> nodes, Func<Node, float> nodeSc
Initialize(nodes, dateTimeProvider);
}

private void Initialize(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider)
{
var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes));
nodesProvided.ThrowIfEmpty(nameof(nodes));
DateTimeProvider = dateTimeProvider ?? Net.DateTimeProvider.Default;

string scheme = null;
foreach (var node in nodesProvided)
{
if (scheme == null)
{
scheme = node.Uri.Scheme;
UsingSsl = scheme == "https";
}
else if (scheme != node.Uri.Scheme)
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
}

InternalNodes = SortNodes(nodesProvided)
.DistinctBy(n => n.Uri)
.ToList();
LastUpdate = DateTimeProvider.Now();
}

/// <inheritdoc />
public DateTime LastUpdate { get; protected set; }

Expand All @@ -73,10 +49,10 @@ private void Initialize(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvi
public virtual IReadOnlyCollection<Node> Nodes => InternalNodes;

/// <inheritdoc />
public bool SniffedOnStartup { get; set; }
public ProductCheckStatus ProductCheckStatus { get; set; } = ProductCheckStatus.NotChecked;

/// <inheritdoc />
public ProductCheckStatus ProductCheckStatus { get; set; }
public bool SniffedOnStartup { get; set; }

/// <inheritdoc />
public virtual bool SupportsPinging => true;
Expand Down Expand Up @@ -133,6 +109,28 @@ public virtual void Reseed(IEnumerable<Node> nodes) { } //ignored

void IDisposable.Dispose() => DisposeManagedResources();

private void Initialize(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider)
{
var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes));
nodesProvided.ThrowIfEmpty(nameof(nodes));
DateTimeProvider = dateTimeProvider ?? Net.DateTimeProvider.Default;

string scheme = null;
foreach (var node in nodesProvided)
if (scheme == null)
{
scheme = node.Uri.Scheme;
UsingSsl = scheme == "https";
}
else if (scheme != node.Uri.Scheme)
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");

InternalNodes = SortNodes(nodesProvided)
.DistinctBy(n => n.Uri)
.ToList();
LastUpdate = DateTimeProvider.Now();
}

protected virtual Node RetryInternalNodes(int globalCursor, Action<AuditEvent, Node> audit = null)
{
audit?.Invoke(AuditEvent.AllNodesDead, null);
Expand Down
14 changes: 10 additions & 4 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,19 @@ private bool ApplyProductCheckRules(RootResponse response)
return true;
}

if (response.HttpStatusCode.HasValue && (response.HttpStatusCode.Value == 401 || response.HttpStatusCode.Value == 403))
// We should always have a status code!
var statusCode = response.HttpStatusCode ?? 0;

// The call to the root path requires monitor permissions. If the current use lacks those, we cannot perform product validation.
if (statusCode is 401 or 403)
{
// The call to the root path requires monitor permissions. If the current use lacks those, we cannot perform product validation.
_connectionPool.ProductCheckStatus = ProductCheckStatus.UndeterminedProduct;
return true;
}

// Any response besides a 200, 401 or 403 is considered a failure and the check is considered incomplete remaining in the NotChecked state.
// This means that the check will run on subsequent requests until we have a valid response to evaluate.
// By returning false, the failure to perform the product check will be included in the audit log.
if (!response.Success) return false;

// Start by assuming the product is valid
Expand All @@ -715,6 +721,8 @@ private bool ApplyProductCheckRules(RootResponse response)
_connectionPool.ProductCheckStatus = ProductCheckStatus.InvalidProduct;
}

return true;

// Elasticsearch should be version 6.0.0 or greater
// Note: For best compatibility, the client should not be used with versions prior to 7.0.0, but we do not enforce that here
static bool VersionTooLow(Version version)
Expand All @@ -741,8 +749,6 @@ static bool Version714InvalidHeader(Version version, string productName)
{
return version >= Version714 && !ExpectedProductName.Equals(productName, StringComparison.Ordinal);
}

return true;
}

private bool PingDisabled(Node node) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Threading.Tasks;
using Elastic.Elasticsearch.Xunit.XunitPlumbing;
using Elasticsearch.Net.VirtualizedCluster;
Expand Down Expand Up @@ -50,7 +51,7 @@ public async Task ProductCheckPerformedOnSecondCallWhenFirstCheckFails()
/** Here's an example with a single node cluster which fails for some reason during the first product check attempt. */
var audit = new Auditor(() => VirtualClusterWith
.Nodes(1, productCheckAlwaysSucceeds: false)
.ProductCheck(r => r.Fails(TimesHelper.Once))
.ProductCheck(r => r.Fails(TimesHelper.Once, 429))
.ProductCheck(r => r.SucceedAlways())
.ClientCalls(r => r.SucceedAlways())
.StaticConnectionPool()
Expand All @@ -60,12 +61,12 @@ public async Task ProductCheckPerformedOnSecondCallWhenFirstCheckFails()
audit = await audit.TraceCalls(skipProductCheck: false,
new ClientCall() {
{ ProductCheckOnStartup },
{ ProductCheckFailure, 9200 }, // <1> as this is the first call, the product check is executed, but fails
{ ProductCheckFailure, 9200 }, // <1> as this is the first call, the product check is executed, but times out
{ HealthyResponse, 9200 } // <2> the actual request is still sent and succeeds
},
new ClientCall() {
{ ProductCheckOnStartup },
{ ProductCheckSuccess, 9200 }, // <3> as the previous product check failed, it runs again on the second call
{ ProductCheckSuccess, 9200 }, // <3> as the previous product check failed, it runs again on the second call and this time it succeeds
{ HealthyResponse, 9200 }
},
new ClientCall() {
Expand All @@ -77,10 +78,10 @@ public async Task ProductCheckPerformedOnSecondCallWhenFirstCheckFails()
[U]
public async Task ProductCheckAttemptsAllNodes()
{
/** Here's an example with a three node cluster which fails for some reason during the first and second product check attempts. */
/** Here's an example with a three node cluster which fails (due to too many requests) during the first and second product check attempts. */
var audit = new Auditor(() => VirtualClusterWith
.Nodes(3, productCheckAlwaysSucceeds: false)
.ProductCheck(r => r.FailAlways())
.ProductCheck(r => r.FailAlways(429))
.ProductCheck(r => r.OnPort(9202).SucceedAlways())
.ClientCalls(r => r.SucceedAlways())
.StaticConnectionPool()
Expand Down