Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Fixed `NullPointerException` in `ResponseMessage` deserialization for GraphSON.
* Enabled the Gremlin.Net driver to repair its connection pool after the server was temporarily unavailable.
* Added the ability to supply a `HandshakeInterceptor` to a `Cluster` which will provide access to the initial HTTP request that establishes the websocket.
* Fixed a possible leakage of connections in the Gremlin.NET driver that could happen if `Dispose()` was called while the pool was creating connections.

==== Bugs

Expand Down
4 changes: 2 additions & 2 deletions gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public Connection(Uri uri, string username, string password, GraphSONReader grap
_webSocketConnection = new WebSocketConnection(webSocketConfiguration);
}

public async Task ConnectAsync()
public async Task ConnectAsync(CancellationToken cancellationToken)
{
await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
await _webSocketConnection.ConnectAsync(_uri, cancellationToken).ConfigureAwait(false);
BeginReceiving();
}

Expand Down
28 changes: 23 additions & 5 deletions gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal class ConnectionPool : IDisposable
private int _poolState;
private const int PoolIdle = 0;
private const int PoolPopulationInProgress = 1;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings settings)
{
Expand Down Expand Up @@ -132,21 +133,34 @@ private async Task FillPoolAsync()
}
catch (Exception)
{
// Dispose created connections if the connection establishment failed
// Dispose all connections that were already created
foreach (var creationTask in connectionCreationTasks)
{
if (!creationTask.IsFaulted)
if (creationTask.IsCompleted)
creationTask.Result?.Dispose();
}

throw;
}

if (_disposed)
{
await CloseAndRemoveAllConnectionsAsync().ConfigureAwait(false);
}
}

private async Task<IConnection> CreateNewConnectionAsync()
{
var newConnection = _connectionFactory.CreateConnection();
await newConnection.ConnectAsync().ConfigureAwait(false);
try
{
await newConnection.ConnectAsync(_cts.Token).ConfigureAwait(false);
}
catch (Exception)
{
// Dispose created connection if the connection establishment failed
newConnection.Dispose();
throw;
}
return newConnection;
}

Expand Down Expand Up @@ -216,7 +230,7 @@ private async Task ReplaceClosedConnectionsAsync()
{
var poolWasPopulated = await EnsurePoolIsHealthyAsync().ConfigureAwait(false);
// Another connection could have been removed already, check if another population is necessary
if (poolWasPopulated)
if (poolWasPopulated && !_disposed)
await ReplaceClosedConnectionsAsync().ConfigureAwait(false);
}

Expand Down Expand Up @@ -260,7 +274,11 @@ protected virtual void Dispose(bool disposing)
if (!_disposed)
{
if (disposing)
{
_cts.Cancel();
CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
_cts.Dispose();
}
_disposed = true;
}
}
Expand Down
3 changes: 2 additions & 1 deletion gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;

namespace Gremlin.Net.Driver
{
internal interface IConnection : IDisposable
{
Task ConnectAsync();
Task ConnectAsync(CancellationToken cancellationToken);
Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
int NrRequestsInFlight { get; }
bool IsOpen { get; }
Expand Down
5 changes: 3 additions & 2 deletions gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;

Expand All @@ -39,9 +40,9 @@ public ProxyConnection(IConnection proxiedConnection, Action<IConnection> releas
_releaseAction = releaseAction;
}

public async Task ConnectAsync()
public async Task ConnectAsync(CancellationToken cancellationToken)
{
await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
await ProxiedConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
}

public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
Expand Down
4 changes: 2 additions & 2 deletions gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public WebSocketConnection(Action<ClientWebSocketOptions> webSocketConfiguration
webSocketConfiguration?.Invoke(_client.Options);
}

public async Task ConnectAsync(Uri uri)
public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false);
await _client.ConnectAsync(uri, cancellationToken).ConfigureAwait(false);
}

public async Task CloseAsync()
Expand Down
128 changes: 120 additions & 8 deletions gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver;
using Gremlin.Net.Driver.Exceptions;
Expand All @@ -46,7 +47,7 @@ public void ShouldEstablishConfiguredNrConnections(int poolSize)

Assert.Equal(poolSize, pool.NrConnections);
mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize));
mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize));
mockedConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Exactly(poolSize));
}

[Fact]
Expand Down Expand Up @@ -235,34 +236,145 @@ public void ShouldThrowAfterWaitingTooLongForUnavailableServer()
Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection());
}

private static IConnection OpenConnection
[Fact]
public void ShouldNotLeakConnectionsIfDisposeIsCalledWhilePoolIsPopulating()
{
var fakeConnectionFactory = new Mock<IConnectionFactory>();
fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
var mockedConnectionToBeDisposed = new Mock<IConnection>();
var poolWasDisposedSignal = new SemaphoreSlim(0, 1);
mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
.Returns((CancellationToken _) => poolWasDisposedSignal.WaitAsync(CancellationToken.None));
// We don't use the `CancellationToken` here as the connection should also be disposed if it did not
// react on the cancellation. This can happen if the task is cancelled just before `ConnectAsync` returns.
fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
try
{
pool.GetAvailableConnection();
}
catch (ServerUnavailableException)
{
// expected as the pool only contains a closed connection at this point
}

pool.Dispose();
poolWasDisposedSignal.Release();

Assert.Equal(0, pool.NrConnections);
mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
}

[Fact]
public void DisposeShouldCancelConnectionEstablishment()
{
var fakeConnectionFactory = new Mock<IConnectionFactory>();
fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1, 0);
var mockedConnectionToBeDisposed = new Mock<IConnection>();
mockedConnectionToBeDisposed.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
.Returns((CancellationToken ct) => Task.Delay(-1, ct));
fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnectionToBeDisposed.Object);
try
{
pool.GetAvailableConnection();
}
catch (ServerUnavailableException)
{
// expected as the pool only contains a closed connection at this point
}

pool.Dispose();

Assert.Equal(0, pool.NrConnections);
mockedConnectionToBeDisposed.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()));
mockedConnectionToBeDisposed.Verify(m => m.Dispose(), Times.Once);
}

[Fact]
public async Task ConnectionsEstablishedInParallelShouldAllBeDisposedIfOneThrowsDuringCreation()
{
// This test unfortunately needs a lot of knowledge about the inner working of the ConnectionPool to
// adequately test that connections established in parallel will all be disposed if one throws an
// exception.

// First create a pool with only closed connections that we can then let the pool replace:
var fakeConnectionFactory = new Mock<IConnectionFactory>();
fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
.Returns(ClosedConnection) // We need to do it like this as we use a dictionary of dead connections in
.Returns(ClosedConnection) // ConnectionPool and the three connections need to be different objects
.Returns(ClosedConnection);// for this to work.
var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3, 0);
var startEstablishingProblematicConnections = new SemaphoreSlim(0, 1);
// Let the pool get one connection that is so slow to open that the pool will afterwards try to create two
// more connections in parallel.
var fakedSlowToEstablishConnection = new Mock<IConnection>();
fakedSlowToEstablishConnection.Setup(m => m.ConnectAsync(It.IsAny<CancellationToken>()))
.Returns(startEstablishingProblematicConnections.WaitAsync);
fakeConnectionFactory.Setup(m => m.CreateConnection())
.Returns(fakedSlowToEstablishConnection.Object);
// Trigger replacement of closed connections
try
{
pool.GetAvailableConnection();
}
catch (ServerUnavailableException)
{
// expected as the pool only contain closed connections at this point
}

var fakedOpenConnection = FakedOpenConnection;
var fakedCannotConnectConnection = FakedCannotConnectConnection;
fakeConnectionFactory.SetupSequence(m => m.CreateConnection())
.Returns(fakedOpenConnection.Object)
.Returns(fakedCannotConnectConnection.Object);
// Let the slow to establish connection finish so the pool can try to establish the other two connections
startEstablishingProblematicConnections.Release();
await Task.Delay(TimeSpan.FromMilliseconds(200));

// Verify that the pool tried to establish both connections and then also disposed both, even though one throw an exception
fakedOpenConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once());
fakedOpenConnection.Verify(m => m.Dispose(), Times.Once);
fakedCannotConnectConnection.Verify(m => m.ConnectAsync(It.IsAny<CancellationToken>()), Times.Once);
fakedCannotConnectConnection.Verify(m => m.Dispose(), Times.Once);
}

private static IConnection OpenConnection => FakedOpenConnection.Object;

private static Mock<IConnection> FakedOpenConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(true);
return fakedConnection.Object;
return fakedConnection;
}
}

private static IConnection ClosedConnection => FakedClosedConnection.Object;

private static IConnection ClosedConnection
private static Mock<IConnection> FakedClosedConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(false);
return fakedConnection.Object;
return fakedConnection;
}
}

private static IConnection CannotConnectConnection => FakedCannotConnectConnection.Object;

private static IConnection CannotConnectConnection
private static Mock<IConnection> FakedCannotConnectConnection
{
get
{
var fakedConnection = new Mock<IConnection>();
fakedConnection.Setup(f => f.IsOpen).Returns(false);
fakedConnection.Setup(f => f.ConnectAsync()).Throws(new Exception("Cannot connect to server."));
return fakedConnection.Object;
fakedConnection.Setup(f => f.ConnectAsync(It.IsAny<CancellationToken>()))
.Throws(new Exception("Cannot connect to server."));
return fakedConnection;
}
}

Expand Down