Skip to content
72 changes: 36 additions & 36 deletions src/Benchmarking/project.lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/CodeGeneration/Nest.Litterateur/project.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -3089,7 +3089,7 @@
"Microsoft.CSharp/4.0.1-beta-23409": {
"type": "package",
"serviceable": true,
"sha512": "GGOOJF47WzXZoka0JHCToQxzSguIy1UeXZywUjA1NPqvKAWVwbSbZ2VxyeIL3jyTV1BHEbBX8FPL6vweUON2aw==",
"sha512": "I1jsSsyK89VfNebrnx2eiBD5YT6zp+DcX2v6AxZ/IosS38QYmA9YKVmssMd5yhRkXwr1f8MfgZTxF1Cli90JEQ==",
"files": [
"lib/dotnet/de/Microsoft.CSharp.xml",
"lib/dotnet/es/Microsoft.CSharp.xml",
Expand Down
102 changes: 102 additions & 0 deletions src/Elasticsearch.Net/ConnectionPool/StickyConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Elasticsearch.Net
{
public class StickyConnectionPool : IConnectionPool
{
protected IDateTimeProvider DateTimeProvider { get; }

protected List<Node> InternalNodes { get; set; }

public bool UsingSsl { get; }
public bool SniffedOnStartup { get; set; }

public IReadOnlyCollection<Node> Nodes => this.InternalNodes;

public int MaxRetries => this.InternalNodes.Count - 1;

public bool SupportsReseeding => false;

public bool SupportsPinging => true;

public DateTime LastUpdate { get; protected set; }

public StickyConnectionPool(IEnumerable<Uri> uris, IDateTimeProvider dateTimeProvider = null)
: this(uris.Select(uri => new Node(uri)), dateTimeProvider)
{ }

public StickyConnectionPool(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider = null)
{
nodes.ThrowIfEmpty(nameof(nodes));

this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;

var nn = nodes.ToList();
var uris = nn.Select(n => n.Uri).ToList();
if (uris.Select(u => u.Scheme).Distinct().Count() > 1)
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");

this.UsingSsl = uris.Any(uri => uri.Scheme == "https");

this.InternalNodes = nn
.DistinctBy(n => n.Uri)
.ToList();

this.LastUpdate = this.DateTimeProvider.Now();
}

protected int GlobalCursor = -1;

public IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
{
var now = this.DateTimeProvider.Now();
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
.ToList();
var count = nodes.Count;
Node node;

if (count == 0)
{
var globalCursor = Interlocked.Increment(ref GlobalCursor);
//could not find a suitable node retrying on first node off globalCursor
audit?.Invoke(AuditEvent.AllNodesDead, null);
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
node.IsResurrected = true;
audit?.Invoke(AuditEvent.Resurrection, node);
yield return node;
yield break;
}

// If the cursor is greater than the default then it's been
// set already but we now have a live node so we should reset it
if (GlobalCursor > -1)
{
Interlocked.Exchange(ref GlobalCursor, -1);
}

var localCursor = 0;

for (var attempts = 0; attempts < count; attempts++)
{
node = nodes[localCursor];
localCursor = (localCursor + 1) % count;
//if this node is not alive or no longer dead mark it as resurrected
if (!node.IsAlive)
{
audit?.Invoke(AuditEvent.Resurrection, node);
node.IsResurrected = true;
}
yield return node;
}
}

public void Reseed(IEnumerable<Node> nodes) { }

void IDisposable.Dispose() => this.DisposeManagedResources();

protected virtual void DisposeManagedResources() { }
}
}
2 changes: 1 addition & 1 deletion src/Elasticsearch.Net/project.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,7 @@
"System.Dynamic.Runtime/4.0.11-beta-23516": {
"type": "package",
"serviceable": true,
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
"files": [
"lib/DNXCore50/System.Dynamic.Runtime.dll",
"lib/MonoAndroid10/_._",
Expand Down
6 changes: 3 additions & 3 deletions src/Nest/project.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,7 @@
"runtime.any.System.Linq.Expressions/4.0.11-beta-23516": {
"type": "package",
"serviceable": true,
"sha512": "4sPxQCjllMJ1uZNlwz/EataPyHSH+AqSDlOIPPqcy/88R2B+abfhPPC78rd7gvHp8KmMX4qbJF6lcCeDIQpmVg==",
"sha512": "P5nzo1Ye0GxB4BYdWian6Y427eTrhn1JS3jLWZq5bMWVn8hS/OIfyylASN0A/qqeLn4rGA0fOzmJSYqFSKvxgQ==",
"files": [
"lib/DNXCore50/System.Linq.Expressions.dll",
"lib/MonoAndroid10/_._",
Expand Down Expand Up @@ -2345,7 +2345,7 @@
"System.Dynamic.Runtime/4.0.11-beta-23516": {
"type": "package",
"serviceable": true,
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
"files": [
"lib/DNXCore50/System.Dynamic.Runtime.dll",
"lib/MonoAndroid10/_._",
Expand Down Expand Up @@ -2599,7 +2599,7 @@
"System.Linq.Expressions/4.0.11-beta-23516": {
"type": "package",
"serviceable": true,
"sha512": "YEl5oyF5fifLbHHP099cvb/6f2r2h1QVHzoaoINPHOZtpNec+RfqvzETXcYDIdHT7l+bBAYsBuVUkBgfQEoYfQ==",
"sha512": "FtKytB13HabzrSvrAgBgOOnG2uxJO4s7zvP5Sk0NS3bwbJUyb5AP1p4897UWnLiB6C95jI4nIkZps51sa9In8g==",
"files": [
"lib/MonoAndroid10/_._",
"lib/MonoTouch10/_._",
Expand Down
237 changes: 237 additions & 0 deletions src/Tests/ClientConcepts/ConnectionPooling/Sticky/SkipDeadNodes.doc.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Elasticsearch.Net;
using FluentAssertions;
using Tests.Framework;
using static Tests.Framework.TimesHelper;
using static Elasticsearch.Net.AuditEvent;

namespace Tests.ClientConcepts.ConnectionPooling.Sticky
{
public class SkipDeadNodes
{
/** Sticky - Skipping Dead Nodes
* When selecting nodes the connection pool will try and skip all the nodes that are marked dead.
*/

protected int NumberOfNodes = 3;

[U] public void EachViewDoesNotSkip()
{
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
var pool = new StickyConnectionPool(seeds);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
}
}

[U] public void EachViewSeesNextButSkipsTheDeadNode()
{
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(DateTime.Now.AddDays(1));
var pool = new StickyConnectionPool(seeds);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
}
/** After we marke the first node alive again we expect it to be hit again*/
seeds.First().MarkAlive();
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
}
}

[U] public void ViewSeesResurrectedNodes()
{
var dateTimeProvider = new TestableDateTimeProvider();
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(dateTimeProvider.Now().AddDays(1));
var pool = new StickyConnectionPool(seeds, dateTimeProvider: dateTimeProvider);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
}
/** If we forward our clock 2 days the node that was marked dead until tomorrow (or yesterday!) should be resurrected */
dateTimeProvider.ChangeTime(d => d.AddDays(2));
var n = pool.CreateView().First();
n.Uri.Port.Should().Be(9200);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9200);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9200);
n.IsResurrected.Should().BeTrue();
}

[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
public async Task FallsOverDeadNodes()
{
/** A cluster with 2 nodes where the second node fails on ping */
var audit = new Auditor(() => Framework.Cluster
.Nodes(4)
.ClientCalls(p => p.Succeeds(Always))
.ClientCalls(p => p.OnPort(9200).FailAlways())
.ClientCalls(p => p.OnPort(9201).FailAlways())
.StickyConnectionPool()
.Settings(p => p.DisablePing())
);

await audit.TraceCalls(
/** The first call goes to 9200 which succeeds */
new ClientCall {
{ BadResponse, 9200},
{ BadResponse, 9201},
{ HealthyResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
/** The 2nd call does a ping on 9201 because its used for the first time.
* It fails so we wrap over to node 9202 */
new ClientCall {
{ HealthyResponse, 9202},
/** Finally we assert that the connectionpool has one node that is marked as dead */
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
new ClientCall {
{ HealthyResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
}
);
}

[U(Skip = "Not sure how to trace this chain"), SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
public async Task FallsOverDeadNodesWithRecoverResetToPrimary()
{
/** A cluster with 2 nodes where the second node fails on ping */
var audit = new Auditor(() => Framework.Cluster
.Nodes(3)
.ClientCalls(p => p.OnPort(9200).Fails(Twice))
.ClientCalls(p => p.OnPort(9200).Succeeds(Once))
.ClientCalls(p => p.OnPort(9201).Succeeds(Once))
.ClientCalls(p => p.OnPort(9201).Fails(Once))
.ClientCalls(p => p.OnPort(9202).FailAlways())
.StickyConnectionPool()
.Settings(p => p.DisablePing())
);

await audit.TraceCalls(
/** The first call goes to 9200 which fails, so we wrap to 9201 */
new ClientCall {
{ BadResponse, 9200},
{ HealthyResponse, 9201},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
},
/** The 2nd call does a ping on 9201 which is healthy */
new ClientCall {
{ HealthyResponse, 9201},
/** Finally we assert that the connectionpool has one node that is marked as dead */
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
},
/** The 3rd call does a ping on 9201 which fails, then 9202 and 9203 as all fail */
new ClientCall {
{ BadResponse, 9201},
{ BadResponse, 9202},
{ MaxRetriesReached },
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
/** Try to resurrect first node 9200, which fails */
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9200},
{ BadResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
/** Try to ressurect second node 9201 which succeeds */
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9201},
{ HealthyResponse, 9201},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(3) }
},
/** The ping on 9201 which returns a bad response leaving all nodes dead */
new ClientCall {
{ BadResponse, 9201},
{ MaxRetriesReached },
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
/** Try ressurect from 9201 again which succeeded */
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9200},
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(3) }
}
);
}

[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
public async Task PicksADifferentNodeEachTimeAnodeIsDown()
{
/** A cluster with 2 nodes where the second node fails on ping */
var audit = new Auditor(() => Framework.Cluster
.Nodes(4)
.ClientCalls(p => p.Fails(Always))
.StickyConnectionPool()
.Settings(p => p.DisablePing())
);

await audit.TraceCalls(
/** All the calls fail */
new ClientCall {
{ BadResponse, 9200},
{ BadResponse, 9201},
{ BadResponse, 9202},
{ BadResponse, 9203},
{ MaxRetriesReached },
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
/** After all our registered nodes are marked dead we want to sample a single dead node
* each time to quickly see if the cluster is back up. We do not want to retry all 4
* nodes
*/
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9200},
{ BadResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9201},
{ BadResponse, 9201},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9202},
{ BadResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9203},
{ BadResponse, 9203},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
}
);
}
}
}
Loading