From 0b9ed2ae2ef3af1b6727820afb784beb0b27166e Mon Sep 17 00:00:00 2001 From: Florian Hockmann Date: Mon, 30 Jul 2018 21:45:26 +0200 Subject: [PATCH 1/4] Add ConnectionPool min and max sizes TINKERPOP-1774 The Gremlin.Net ConnectionPool now has min and max sizes. The pool will be initialized with the configured minimum number of connections on creation. It will also no longer create an unlimited number of connections. Instead, a TimeoutException will be thrown when the max limit is reached and no connection became available within a configurable time. --- CHANGELOG.asciidoc | 1 + docs/src/upgrade/release-3.4.x.asciidoc | 11 ++ .../src/Gremlin.Net/Driver/Connection.cs | 16 +-- .../src/Gremlin.Net/Driver/ConnectionPool.cs | 129 ++++++++++++------ .../Driver/ConnectionPoolSettings.cs | 55 ++++++++ .../src/Gremlin.Net/Driver/GremlinClient.cs | 7 +- .../Driver/GremlinClientExtensions.cs | 12 +- .../Driver/Remote/DriverRemoteConnection.cs | 4 +- .../Gremlin.Net/Driver/WebSocketConnection.cs | 16 +-- .../Driver/ConnectionPoolTests.cs | 68 +++++++-- .../RemoteConnectionFactory.cs | 4 +- 11 files changed, 239 insertions(+), 84 deletions(-) create mode 100644 gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9e9d4f31e76..acdfe79e366 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima This release also includes changes from <>. +* Added min and max connection pool sizes for Gremlin.Net which are configurable through optional ConnectionPoolSettings. * Implemented `ShortestPathVertexProgram` and the `shortestPath()` step. * `AbstractGraphProvider` uses `g.io()` for loading test data. * Added the `io()` start step and `read()` and `write()` termination steps to the Gremlin language. diff --git a/docs/src/upgrade/release-3.4.x.asciidoc b/docs/src/upgrade/release-3.4.x.asciidoc index a9ef4ef0a22..33fe5dd5e7d 100644 --- a/docs/src/upgrade/release-3.4.x.asciidoc +++ b/docs/src/upgrade/release-3.4.x.asciidoc @@ -242,6 +242,17 @@ when dealing with that event. To make this easier, the event now raises with a ` link:https://issues.apache.org/jira/browse/TINKERPOP-1831[TINKERPOP-1831] +==== Gremlin.Net: Configurable Max and Min ConnectionPool Sizes + +Gremlin.Net's `ConnectionPool` now has a minimum and a maximum size. These sizes are configurable through added +`ConnectionPoolSettings`. The minimum size determines how many connections are initially created. The maximum size +is an upper limit of connections that can be created. When this limit is reached and another connection is needed, +then the connection pool waits for a connection to become available again. The time to be waited is limited by the +newly introduced option `ConnectionPoolSettings.WaitForConnectionTimeout`. A `TimeoutException` is thrown when +no connection becomes available until this timeout is reached. + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-1774[TINKERPOP-1774] + ==== Deprecation Removal The following deprecated classes, methods or fields have been removed in this version: diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs index 279c708d345..2452e3e302c 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs @@ -59,23 +59,23 @@ public async Task> SubmitAsync(RequestMessage requestM return await ReceiveAsync().ConfigureAwait(false); } - public async Task ConnectAsync() + public Task ConnectAsync() { - await _webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false); + return _webSocketConnection.ConnectAsync(_uri); } - public async Task CloseAsync() + public Task CloseAsync() { - await _webSocketConnection.CloseAsync().ConfigureAwait(false); + return _webSocketConnection.CloseAsync(); } public bool IsOpen => _webSocketConnection.IsOpen; - private async Task SendAsync(RequestMessage message) + private Task SendAsync(RequestMessage message) { var graphsonMsg = _graphSONWriter.WriteObject(message); var serializedMsg = _messageSerializer.SerializeMessage(graphsonMsg); - await _webSocketConnection.SendMessageAsync(serializedMsg).ConfigureAwait(false); + return _webSocketConnection.SendMessageAsync(serializedMsg); } private async Task> ReceiveAsync() @@ -121,7 +121,7 @@ private async Task> ReceiveAsync() return result; } - private async Task AuthenticateAsync() + private Task AuthenticateAsync() { if (string.IsNullOrEmpty(_username) || string.IsNullOrEmpty(_password)) throw new InvalidOperationException( @@ -130,7 +130,7 @@ private async Task AuthenticateAsync() var message = RequestMessage.Build(Tokens.OpsAuthentication).Processor(Tokens.ProcessorTraversal) .AddArgument(Tokens.ArgsSasl, SaslArgument()).Create(); - await SendAsync(message).ConfigureAwait(false); + return SendAsync(message); } private string SaslArgument() diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index d9e53f4391e..a65208a40a5 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -23,7 +23,8 @@ using System; using System.Collections.Concurrent; -using System.Linq; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Gremlin.Net.Process; @@ -33,91 +34,134 @@ internal class ConnectionPool : IDisposable { private readonly ConnectionFactory _connectionFactory; private readonly ConcurrentBag _connections = new ConcurrentBag(); - private readonly object _connectionsLock = new object(); + private readonly AutoResetEvent _newConnectionAvailable = new AutoResetEvent(false); + private readonly int _minPoolSize; + private readonly int _maxPoolSize; + private readonly TimeSpan _waitForConnectionTimeout; + private int _nrConnections; - public ConnectionPool(ConnectionFactory connectionFactory) + public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings settings) { _connectionFactory = connectionFactory; + _minPoolSize = settings.MinSize; + _maxPoolSize = settings.MaxSize; + _waitForConnectionTimeout = settings.WaitForConnectionTimeout; + PopulatePoolAsync().WaitUnwrap(); } - public int NrConnections { get; private set; } + public int NrConnections => Interlocked.CompareExchange(ref _nrConnections, 0, 0); + + private async Task PopulatePoolAsync() + { + var connectionCreationTasks = new List>(_minPoolSize); + for (var i = 0; i < _minPoolSize; i++) + { + connectionCreationTasks.Add(CreateNewConnectionAsync()); + } + + var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false); + foreach (var c in createdConnections) + { + _connections.Add(c); + } + + Interlocked.CompareExchange(ref _nrConnections, _minPoolSize, 0); + } public async Task GetAvailableConnectionAsync() { - if (!TryGetConnectionFromPool(out var connection)) - connection = await CreateNewConnectionAsync().ConfigureAwait(false); + if (TryGetConnectionFromPool(out var connection)) + return ProxiedConnection(connection); + connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection(); + return ProxiedConnection(connection); + } - return new ProxyConnection(connection, AddConnectionIfOpen); + private IConnection ProxiedConnection(Connection connection) + { + return new ProxyConnection(connection, ReturnConnectionIfOpen); } - private bool TryGetConnectionFromPool(out Connection connection) + private void ReturnConnectionIfOpen(Connection connection) + { + if (!connection.IsOpen) + { + ConsiderUnavailable(); + DefinitelyDestroyConnection(connection); + return; + } + + _connections.Add(connection); + _newConnectionAvailable.Set(); + } + + private async Task AddConnectionIfUnderMaximumAsync() { while (true) { - connection = null; - lock (_connectionsLock) - { - if (_connections.IsEmpty) return false; - _connections.TryTake(out connection); - } + var nrOpened = Interlocked.CompareExchange(ref _nrConnections, 0, 0); + if (nrOpened >= _maxPoolSize) return null; - if (connection.IsOpen) return true; - connection.Dispose(); + if (Interlocked.CompareExchange(ref _nrConnections, nrOpened + 1, nrOpened) == nrOpened) + break; } + + return await CreateNewConnectionAsync().ConfigureAwait(false); } private async Task CreateNewConnectionAsync() { - NrConnections++; var newConnection = _connectionFactory.CreateConnection(); await newConnection.ConnectAsync().ConfigureAwait(false); return newConnection; } - private void AddConnectionIfOpen(Connection connection) + private Connection WaitForConnection() { - if (!connection.IsOpen) + var start = DateTimeOffset.Now; + var remaining = _waitForConnectionTimeout; + do { - ConsiderUnavailable(); - connection.Dispose(); - return; - } - AddConnection(connection); + if (_newConnectionAvailable.WaitOne(remaining)) + { + if (TryGetConnectionFromPool(out var connection)) + return connection; + } + remaining = _waitForConnectionTimeout - (DateTimeOffset.Now - start); + } while (remaining > TimeSpan.Zero); + + ConsiderUnavailable(); + throw new TimeoutException("Timed out while waiting for an available connection."); } - private void AddConnection(Connection connection) + private bool TryGetConnectionFromPool(out Connection connection) { - lock (_connectionsLock) + while (true) { - _connections.Add(connection); + _connections.TryTake(out connection); + if (connection == null) return false; // _connections is empty + if (connection.IsOpen) return true; + DefinitelyDestroyConnection(connection); } } private void ConsiderUnavailable() { - CloseAndRemoveAllConnections(); - } - - private void CloseAndRemoveAllConnections() - { - lock (_connectionsLock) - { - TeardownAsync().WaitUnwrap(); - RemoveAllConnections(); - } + CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); } - private void RemoveAllConnections() + private async Task CloseAndRemoveAllConnectionsAsync() { while (_connections.TryTake(out var connection)) { - connection.Dispose(); + await connection.CloseAsync().ConfigureAwait(false); + DefinitelyDestroyConnection(connection); } } - private async Task TeardownAsync() + private void DefinitelyDestroyConnection(Connection connection) { - await Task.WhenAll(_connections.Select(c => c.CloseAsync())).ConfigureAwait(false); + connection.Dispose(); + Interlocked.Decrement(ref _nrConnections); } #region IDisposable Support @@ -135,10 +179,11 @@ protected virtual void Dispose(bool disposing) if (!_disposed) { if (disposing) - CloseAndRemoveAllConnections(); + CloseAndRemoveAllConnectionsAsync().WaitUnwrap(); _disposed = true; } } + #endregion } } \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs new file mode 100644 index 00000000000..b1561376d34 --- /dev/null +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs @@ -0,0 +1,55 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; + +namespace Gremlin.Net.Driver +{ + /// + /// Holds settings for the . + /// + public class ConnectionPoolSettings + { + private const int DefaultMinPoolSize = 8; + private const int DefaultMaxPoolSize = 128; + private static readonly TimeSpan DefaultWaitForConnectionTimeout = TimeSpan.FromSeconds(3); + + /// + /// Gets or sets the minimum size of a connection pool. + /// + /// The default value is 8. + public int MinSize { get; set; } = DefaultMinPoolSize; + + /// + /// Gets or sets the maximum size of a connection pool. + /// + /// The default value is 128. + public int MaxSize { get; set; } = DefaultMaxPoolSize; + + /// + /// Gets or sets the timespan to wait for a new connection before timing out. + /// + /// The default value is 3 seconds. + public TimeSpan WaitForConnectionTimeout { get; set; } = DefaultWaitForConnectionTimeout; + } +} \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs index 2b47cbc3c01..41f08d18191 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs @@ -53,13 +53,16 @@ public class GremlinClient : IGremlinClient /// A instance to read received GraphSON data. /// a instance to write GraphSON data. /// The GraphSON version mime type, defaults to latest supported by the server. + /// The for the connection pool. public GremlinClient(GremlinServer gremlinServer, GraphSONReader graphSONReader = null, - GraphSONWriter graphSONWriter = null, string mimeType = null) + GraphSONWriter graphSONWriter = null, string mimeType = null, + ConnectionPoolSettings connectionPoolSettings = null) { var reader = graphSONReader ?? new GraphSON3Reader(); var writer = graphSONWriter ?? new GraphSON3Writer(); var connectionFactory = new ConnectionFactory(gremlinServer, reader, writer, mimeType ?? DefaultMimeType); - _connectionPool = new ConnectionPool(connectionFactory); + _connectionPool = + new ConnectionPool(connectionFactory, connectionPoolSettings ?? new ConnectionPoolSettings()); } /// diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClientExtensions.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClientExtensions.cs index 4aad73ee6ff..1365b2f54ba 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClientExtensions.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClientExtensions.cs @@ -92,10 +92,10 @@ public static async Task SubmitWithSingleResultAsync(this IGremlinClient g /// Thrown when a response is received from Gremlin Server that indicates /// that an error occurred. /// - public static async Task SubmitAsync(this IGremlinClient gremlinClient, string requestScript, + public static Task SubmitAsync(this IGremlinClient gremlinClient, string requestScript, Dictionary bindings = null) { - await gremlinClient.SubmitAsync(requestScript, bindings).ConfigureAwait(false); + return gremlinClient.SubmitAsync(requestScript, bindings); } /// @@ -109,9 +109,9 @@ public static async Task SubmitAsync(this IGremlinClient gremlinClient, string r /// Thrown when a response is received from Gremlin Server that indicates /// that an error occurred. /// - public static async Task SubmitAsync(this IGremlinClient gremlinClient, RequestMessage requestMessage) + public static Task SubmitAsync(this IGremlinClient gremlinClient, RequestMessage requestMessage) { - await gremlinClient.SubmitAsync(requestMessage).ConfigureAwait(false); + return gremlinClient.SubmitAsync(requestMessage); } /// @@ -126,7 +126,7 @@ public static async Task SubmitAsync(this IGremlinClient gremlinClient, RequestM /// Thrown when a response is received from Gremlin Server that indicates /// that an error occurred. /// - public static async Task> SubmitAsync(this IGremlinClient gremlinClient, + public static Task> SubmitAsync(this IGremlinClient gremlinClient, string requestScript, Dictionary bindings = null) { @@ -134,7 +134,7 @@ public static async Task> SubmitAsync(this IGremlinCli if (bindings != null) msgBuilder.AddArgument(Tokens.ArgsBindings, bindings); var msg = msgBuilder.Create(); - return await gremlinClient.SubmitAsync(msg).ConfigureAwait(false); + return gremlinClient.SubmitAsync(msg); } } } \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/Remote/DriverRemoteConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/Remote/DriverRemoteConnection.cs index 8cbd43d0c80..2c3fc28fa50 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/Remote/DriverRemoteConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/Remote/DriverRemoteConnection.cs @@ -71,7 +71,7 @@ public async Task> SubmitAsync(Bytecode bytecode) return new DriverRemoteTraversal(_client, requestId, resultSet); } - private async Task> SubmitBytecodeAsync(Guid requestid, Bytecode bytecode) + private Task> SubmitBytecodeAsync(Guid requestid, Bytecode bytecode) { var requestMsg = RequestMessage.Build(Tokens.OpsBytecode) @@ -80,7 +80,7 @@ private async Task> SubmitBytecodeAsync(Guid requestid, B .AddArgument(Tokens.ArgsGremlin, bytecode) .AddArgument(Tokens.ArgsAliases, new Dictionary {{"g", _traversalSource}}) .Create(); - return await _client.SubmitAsync(requestMsg).ConfigureAwait(false); + return _client.SubmitAsync(requestMsg); } /// diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs index 9672606ad4b..1196359fa9c 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/WebSocketConnection.cs @@ -35,24 +35,20 @@ internal class WebSocketConnection : IDisposable private const WebSocketMessageType MessageType = WebSocketMessageType.Binary; private ClientWebSocket _client; - public async Task ConnectAsync(Uri uri) + public Task ConnectAsync(Uri uri) { _client = new ClientWebSocket(); - await _client.ConnectAsync(uri, CancellationToken.None).ConfigureAwait(false); + return _client.ConnectAsync(uri, CancellationToken.None); } - public async Task CloseAsync() + public Task CloseAsync() { - await - _client.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None) - .ConfigureAwait(false); + return _client.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); } - public async Task SendMessageAsync(byte[] message) + public Task SendMessageAsync(byte[] message) { - await - _client.SendAsync(new ArraySegment(message), MessageType, true, CancellationToken.None) - .ConfigureAwait(false); + return _client.SendAsync(new ArraySegment(message), MessageType, true, CancellationToken.None); } public async Task ReceiveMessageAsync() diff --git a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/ConnectionPoolTests.cs index 21a2627155e..a93a3837aae 100644 --- a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/ConnectionPoolTests.cs +++ b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/ConnectionPoolTests.cs @@ -49,8 +49,8 @@ private async Task ExecuteMultipleLongRunningRequestsInParallel(IGremlinClient g [Fact] public async Task ShouldReuseConnectionForSequentialRequests() { - var gremlinServer = new GremlinServer(TestHost, TestPort); - using (var gremlinClient = new GremlinClient(gremlinServer)) + const int minConnectionPoolSize = 1; + using (var gremlinClient = CreateGremlinClient(minConnectionPoolSize)) { await gremlinClient.SubmitAsync(""); await gremlinClient.SubmitAsync(""); @@ -60,31 +60,73 @@ public async Task ShouldReuseConnectionForSequentialRequests() } } - [Fact] - public void ShouldOnlyCreateConnectionWhenNecessary() + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(8)] + public void ShouldStartWithConfiguredNrMinConnections(int minConnectionPoolSize) { - var gremlinServer = new GremlinServer(TestHost, TestPort); - using (var gremlinClient = new GremlinClient(gremlinServer)) + using (var gremlinClient = CreateGremlinClient(minConnectionPoolSize)) { var nrConnections = gremlinClient.NrConnections; - Assert.Equal(0, nrConnections); + Assert.Equal(minConnectionPoolSize, nrConnections); } } [Fact] public async Task ShouldExecuteParallelRequestsOnDifferentConnections() { - var gremlinServer = new GremlinServer(TestHost, TestPort); - using (var gremlinClient = new GremlinClient(gremlinServer)) + const int nrParallelRequests = 5; + using (var gremlinClient = CreateGremlinClient(nrParallelRequests)) { - var sleepTime = 50; - var nrParallelRequests = 5; + const int sleepTime = 50; await ExecuteMultipleLongRunningRequestsInParallel(gremlinClient, nrParallelRequests, sleepTime); - var nrConnections = gremlinClient.NrConnections; - Assert.Equal(nrParallelRequests, nrConnections); + Assert.Equal(nrParallelRequests, gremlinClient.NrConnections); + } + } + + [Fact] + public async Task ShouldNotCreateMoreThanConfiguredNrMaxConnections() + { + const int maxConnectionPoolSize = 1; + using (var gremlinClient = CreateGremlinClient(maxConnectionPoolSize: maxConnectionPoolSize)) + { + const int sleepTime = 100; + + await ExecuteMultipleLongRunningRequestsInParallel(gremlinClient, maxConnectionPoolSize + 1, sleepTime); + + Assert.Equal(maxConnectionPoolSize, gremlinClient.NrConnections); + } + } + + [Fact] + public async Task ShouldThrowTimeoutExceptionWhenNoConnectionIsAvailable() + { + const int nrParallelRequests = 3; + const int waitForConnectionTimeoutInMs = 5; + using (var gremlinClient = CreateGremlinClient(maxConnectionPoolSize: nrParallelRequests - 1, + waitForConnectionTimeoutInMs: waitForConnectionTimeoutInMs)) + { + const int sleepTime = 100; + + await Assert.ThrowsAsync(() => + ExecuteMultipleLongRunningRequestsInParallel(gremlinClient, nrParallelRequests, sleepTime)); } } + + private static GremlinClient CreateGremlinClient(int minConnectionPoolSize = 0, int maxConnectionPoolSize = 8, + int waitForConnectionTimeoutInMs = 5000) + { + var gremlinServer = new GremlinServer(TestHost, TestPort); + return new GremlinClient(gremlinServer, + connectionPoolSettings: new ConnectionPoolSettings + { + MinSize = minConnectionPoolSize, + MaxSize = maxConnectionPoolSize, + WaitForConnectionTimeout = TimeSpan.FromMilliseconds(waitForConnectionTimeoutInMs) + }); + } } } \ No newline at end of file diff --git a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Process/Traversal/DriverRemoteConnection/RemoteConnectionFactory.cs b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Process/Traversal/DriverRemoteConnection/RemoteConnectionFactory.cs index 39b7feada23..088db59e8ca 100644 --- a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Process/Traversal/DriverRemoteConnection/RemoteConnectionFactory.cs +++ b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Process/Traversal/DriverRemoteConnection/RemoteConnectionFactory.cs @@ -44,7 +44,9 @@ public IRemoteConnection CreateRemoteConnection() public IRemoteConnection CreateRemoteConnection(string traversalSource) { - var c = new DriverRemoteConnectionImpl(new GremlinClient(new GremlinServer(TestHost, TestPort)), + var c = new DriverRemoteConnectionImpl( + new GremlinClient(new GremlinServer(TestHost, TestPort), + connectionPoolSettings: new ConnectionPoolSettings {MinSize = 1}), traversalSource); _connections.Add(c); return c; From c58735661d98889423d7076528b06eb649b73cc1 Mon Sep 17 00:00:00 2001 From: Florian Hockmann Date: Thu, 2 Aug 2018 18:45:49 +0200 Subject: [PATCH 2/4] Add Gremlin.Net driver settings docs TINKERPOP-1774 --- CHANGELOG.asciidoc | 2 +- docs/src/dev/provider/index.asciidoc | 4 +- .../reference/gremlin-applications.asciidoc | 1 + docs/src/reference/gremlin-variants.asciidoc | 45 ++++++++++++++++++- docs/src/upgrade/release-3.4.x.asciidoc | 4 +- .../Driver/ConnectionPoolSettings.cs | 2 +- .../src/Gremlin.Net/Driver/GremlinServer.cs | 3 +- 7 files changed, 53 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index acdfe79e366..ee54f14ccec 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,7 +25,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima This release also includes changes from <>. -* Added min and max connection pool sizes for Gremlin.Net which are configurable through optional ConnectionPoolSettings. +* Added min and max connection pool sizes for Gremlin.Net which are configurable through optional `ConnectionPoolSettings`. * Implemented `ShortestPathVertexProgram` and the `shortestPath()` step. * `AbstractGraphProvider` uses `g.io()` for loading test data. * Added the `io()` start step and `read()` and `write()` termination steps to the Gremlin language. diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc index 49bc7c73e50..4434cf76e64 100644 --- a/docs/src/dev/provider/index.asciidoc +++ b/docs/src/dev/provider/index.asciidoc @@ -572,9 +572,9 @@ internal class MyTypeReader : IGraphSONDeserializer } } -var graphsonReader = new GraphSONReader( +var graphsonReader = new GraphSON3Reader( new Dictionary {{MyType.GraphsonType, new MyTypeReader()}}); -var graphsonWriter = new GraphSONWriter( +var graphsonWriter = new GraphSON3Writer( new Dictionary {{typeof(MyType), new MyClassWriter()}}); var gremlinClient = new GremlinClient(new GremlinServer("localhost", 8182), graphsonReader, graphsonWriter); diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 234bf3f1b03..04f05028f92 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -1308,6 +1308,7 @@ Gremlin-Console |PLAIN SASL (username/password) |3.0.0-incubating |Pluggable SASL |3.0.0-incubating |GSSAPI SASL (Kerberos) |3.3.0 |Gremlin-Python |PLAIN SASL |3.2.2 +|Gremlin.Net |PLAIN SASL |3.2.7 |Gremlin-Javascript |PLAIN SASL |3.3.0 |========================================================= diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 8699b300158..363f1176fa8 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -342,7 +342,8 @@ A traversal source can be spawned with `RemoteStrategy` from an empty `Graph`. [source,csharp] ---- var graph = new Graph(); -var g = graph.Traversal().WithRemote(new DriverRemoteConnection(new GremlinClient(new GremlinServer("localhost", 8182)))); +var remoteConnection = new DriverRemoteConnection(new GremlinClient(new GremlinServer("localhost", 8182))); +var g = graph.Traversal().WithRemote(remoteConnection); ---- When a traversal from the `GraphTraversalSource` is iterated, the traversal’s `Bytecode` is sent over the wire via the registered @@ -366,6 +367,48 @@ terminal/action methods off of `ITraversal`. * `ITraversal.ToSet()` * `ITraversal.Iterate()` +=== Configuration + +The following sections describe how the Gremlin.Net driver can be configured. + +==== Gremlin Server + +The connection properties for the Gremlin.Net driver can be passed to the `GremlinServer` instance as keyword arguments: + +[width="100%",cols="3,10,^2",options="header"] +|========================================================= +|Key |Description |Default +|hostname |The hostname that the driver will connect to. |localhost +|port |The port on which Gremlin Server can be reached. |8182 +|enableSsl |Determines if SSL should be enabled or not. If enabled on the server then it must be enabled on the client. |false +|username |The username to submit on requests that require authentication. |_none_ +|password |The password to submit on requests that require authentication. |_none_ +|========================================================= + +==== Connection Pool + +It is also possible to configure the `ConnectionPool` of the Gremlin.Net driver. These configuration options can be set as properties +on the `ConnectionPoolSettings` instance that can be passed to the `GremlinClient`: + +[width="100%",cols="3,10,^2",options="header"] +|========================================================= +|Key |Description |Default +|MinSize |The minimum size of the connection pool. This determines how many connections are initially created. |8 +|MaxSize |The maximum size of the connection pool. |128 +|WaitForConnectionTimeout |The timespan to wait for a connection when the maximum size is reached before timing out. A `TimeoutException` is thrown when no connection becomes available until this timeout is reached. |3 seconds. +|========================================================= + +==== GraphSON Serialization + +The Gremlin.Net driver uses by default GraphSON 3.0 but it is also possible to use GraphSON 2.0 which can be necessary +when the server does not support GraphSON 3.0 yet: + +[source,csharp] +---- +var client = new GremlinClient(new GremlinServer("localhost", 8182), new GraphSON2Reader(), + new GraphSON2Writer(), GremlinClient.GraphSON2MimeType); +---- + === Static Enums and Methods Gremlin has various tokens (e.g. `T`, `P`, `Order`, `Operator`, etc.) that are represented in Gremlin.Net as classes. diff --git a/docs/src/upgrade/release-3.4.x.asciidoc b/docs/src/upgrade/release-3.4.x.asciidoc index 33fe5dd5e7d..19611af3973 100644 --- a/docs/src/upgrade/release-3.4.x.asciidoc +++ b/docs/src/upgrade/release-3.4.x.asciidoc @@ -242,9 +242,9 @@ when dealing with that event. To make this easier, the event now raises with a ` link:https://issues.apache.org/jira/browse/TINKERPOP-1831[TINKERPOP-1831] -==== Gremlin.Net: Configurable Max and Min ConnectionPool Sizes +==== Gremlin.Net Connection Pool -Gremlin.Net's `ConnectionPool` now has a minimum and a maximum size. These sizes are configurable through added +Gremlin.Net's `ConnectionPool` now has a minimum and a maximum size. These sizes are configurable through the newly added `ConnectionPoolSettings`. The minimum size determines how many connections are initially created. The maximum size is an upper limit of connections that can be created. When this limit is reached and another connection is needed, then the connection pool waits for a connection to become available again. The time to be waited is limited by the diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs index b1561376d34..82b5edd40d9 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPoolSettings.cs @@ -47,7 +47,7 @@ public class ConnectionPoolSettings public int MaxSize { get; set; } = DefaultMaxPoolSize; /// - /// Gets or sets the timespan to wait for a new connection before timing out. + /// Gets or sets the timespan to wait for a connection to become available before timing out. /// /// The default value is 3 seconds. public TimeSpan WaitForConnectionTimeout { get; set; } = DefaultWaitForConnectionTimeout; diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinServer.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinServer.cs index 601bbae3b31..43ef159e420 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinServer.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinServer.cs @@ -38,7 +38,8 @@ public class GremlinServer /// Specifies whether SSL should be enabled. /// The username to submit on requests that require authentication. /// The password to submit on requests that require authentication. - public GremlinServer(string hostname, int port = 8182, bool enableSsl = false, string username = null, string password = null) + public GremlinServer(string hostname = "localhost", int port = 8182, bool enableSsl = false, + string username = null, string password = null) { Uri = CreateUri(hostname, port, enableSsl); Username = username; From d02628b09701886a7eb11930955b8cca9ddddc7c Mon Sep 17 00:00:00 2001 From: Florian Hockmann Date: Thu, 30 Aug 2018 16:24:51 +0200 Subject: [PATCH 3/4] Use own AsyncAutoResetEvent in ConnectionPool This replaces the synchronous AutoResetEvent with our own AsyncAutoResetEvent to avoid blocking threads that wait for an available connection. --- .../Gremlin.Net/Driver/AsyncAutoResetEvent.cs | 103 +++++++++++ .../src/Gremlin.Net/Driver/ConnectionPool.cs | 9 +- .../Driver/AsyncAutoResetEventTests.cs | 169 ++++++++++++++++++ pom.xml | 1 + 4 files changed, 278 insertions(+), 4 deletions(-) create mode 100644 gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs create mode 100644 gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs new file mode 100644 index 00000000000..52c07b0a846 --- /dev/null +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs @@ -0,0 +1,103 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +// The implementation is based on this blog post by Stephen Toub: +// https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/ + +namespace Gremlin.Net.Driver +{ + /// + /// An async version of the AutoResetEvent. + /// + public class AsyncAutoResetEvent + { + private static readonly Task CompletedTask = Task.FromResult(true); + private readonly List> _waitingTasks = new List>(); + private bool _isSignaled; + + /// + /// Asynchronously waits for this event to be set or until a timeout occurs. + /// + /// A that that represents the number of milliseconds to wait. + /// true if the current instance received a signal before timing out; otherwise, false. + public async Task WaitOneAsync(TimeSpan timeout) + { + var tcs = new TaskCompletionSource(); + var waitTask = WaitForSignalAsync(tcs); + if (waitTask.IsCompleted) return true; + + await Task.WhenAny(waitTask, Task.Delay(timeout)).ConfigureAwait(false); + lock (_waitingTasks) + { + if (!waitTask.IsCompleted) + { + // The wait timed out, so we need to remove the waiting task. + _waitingTasks.Remove(tcs); + tcs.SetResult(false); + } + } + + return waitTask.Result; + } + + private Task WaitForSignalAsync(TaskCompletionSource tcs) + { + lock (_waitingTasks) + { + if (_isSignaled) + { + _isSignaled = false; + return CompletedTask; + } + _waitingTasks.Add(tcs); + } + return tcs.Task; + } + + /// + /// Sets the event. + /// + public void Set() + { + TaskCompletionSource toRelease = null; + lock (_waitingTasks) + { + if (_waitingTasks.Count == 0) + { + _isSignaled = true; + } + else + { + toRelease = _waitingTasks[0]; + _waitingTasks.RemoveAt(0); + } + } + + toRelease?.SetResult(true); + } + } +} \ No newline at end of file diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs index a65208a40a5..e76cf51cec8 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs @@ -34,7 +34,7 @@ internal class ConnectionPool : IDisposable { private readonly ConnectionFactory _connectionFactory; private readonly ConcurrentBag _connections = new ConcurrentBag(); - private readonly AutoResetEvent _newConnectionAvailable = new AutoResetEvent(false); + private readonly AsyncAutoResetEvent _newConnectionAvailable = new AsyncAutoResetEvent(); private readonly int _minPoolSize; private readonly int _maxPoolSize; private readonly TimeSpan _waitForConnectionTimeout; @@ -72,7 +72,8 @@ public async Task GetAvailableConnectionAsync() { if (TryGetConnectionFromPool(out var connection)) return ProxiedConnection(connection); - connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? WaitForConnection(); + connection = await AddConnectionIfUnderMaximumAsync().ConfigureAwait(false) ?? + await WaitForConnectionAsync().ConfigureAwait(false); return ProxiedConnection(connection); } @@ -115,13 +116,13 @@ private async Task CreateNewConnectionAsync() return newConnection; } - private Connection WaitForConnection() + private async Task WaitForConnectionAsync() { var start = DateTimeOffset.Now; var remaining = _waitForConnectionTimeout; do { - if (_newConnectionAvailable.WaitOne(remaining)) + if (await _newConnectionAvailable.WaitOneAsync(remaining).ConfigureAwait(false)) { if (TryGetConnectionFromPool(out var connection)) return connection; diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs new file mode 100644 index 00000000000..26a5a58cd04 --- /dev/null +++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/AsyncAutoResetEventTests.cs @@ -0,0 +1,169 @@ +#region License + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Gremlin.Net.Driver; +using Xunit; + +namespace Gremlin.Net.UnitTest.Driver +{ + public class AsyncAutoResetEventTests + { + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(100); + + [Fact] + public async Task WaitOneAsync_AfterSet_CompletesSynchronously() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + var task = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task.IsCompleted); + Assert.True(await task); + } + + [Fact] + public async Task MultipleWaitOneAsync_AfterSet_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task1.IsCompleted); + Assert.True(await task1); + Assert.False(await task2); + } + + [Fact] + public async Task MultipleWaitOneAsync_AfterMultipleSet_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + are.Set(); + are.Set(); + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + + Assert.True(task1.IsCompleted); + Assert.True(await task1); + Assert.False(await task2); + } + + [Fact] + public async Task WaitOneAsync_SetBeforeTimeout_WaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + Assert.True(await task); + } + + [Fact] + public async Task Set_AfterMultipleWaitOneAsync_OnlyFirstWaitIsSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task1 = are.WaitOneAsync(DefaultTimeout); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + await AssertCompletesBeforeTimeoutAsync(task1, DefaultTimeout.Milliseconds + 50); + Assert.False(await task2); + } + + [Fact] + public async Task WaitOneAsync_NotSet_OnlyWaitUntilTimeout() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + + await AssertCompletesBeforeTimeoutAsync(task, DefaultTimeout.Milliseconds + 50); + } + + [Fact] + public async Task WaitOneAsync_NotSet_WaitNotSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task = are.WaitOneAsync(DefaultTimeout); + + Assert.False(await task); + } + + [Fact] + public async Task WaitOneAsync_SetAfterPreviousWaitTimedOut_OnlySecondWaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var task1 = are.WaitOneAsync(DefaultTimeout); + await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50)); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + Assert.False(await task1); + Assert.True(await task2); + } + + [Fact] + public async Task WaitOneAsync_SetAfterMultipleWaitsTimedOut_OnlyLastWaitSuccessful() + { + var are = new AsyncAutoResetEvent(); + + var timedOutTasks = new List>(); + for (var i = 0; i < 1000; i++) + { + timedOutTasks.Add(are.WaitOneAsync(DefaultTimeout)); + } + + await Task.Delay(DefaultTimeout + TimeSpan.FromMilliseconds(50)); + var task2 = are.WaitOneAsync(DefaultTimeout); + are.Set(); + + foreach (var t in timedOutTasks) + { + Assert.False(await t); + } + Assert.True(await task2); + } + + private static async Task AssertCompletesBeforeTimeoutAsync(Task task, int timeoutInMs) + { + var completedTask = await WaitForTaskOrTimeoutAsync(task, TimeSpan.FromMilliseconds(timeoutInMs)); + if (completedTask != task) + throw new Exception("Task did not complete."); + } + + private static Task WaitForTaskOrTimeoutAsync(Task task, TimeSpan timeout) + { + return Task.WhenAny(task, Task.Delay(timeout)); + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index dfb58247912..50e84c427ac 100644 --- a/pom.xml +++ b/pom.xml @@ -401,6 +401,7 @@ limitations under the License. **/node/node_modules/** **/node/node **/npm-debug.log + **/.idea/** From 1c7a3def7e483afc19ff6060e1b8206dc15b062d Mon Sep 17 00:00:00 2001 From: Florian Hockmann Date: Fri, 31 Aug 2018 18:17:31 +0200 Subject: [PATCH 4/4] Minor cleanup TINKERPOP-1774 --- .../Gremlin.Net/Driver/AsyncAutoResetEvent.cs | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs index 52c07b0a846..873f6a7ee3e 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Driver/AsyncAutoResetEvent.cs @@ -25,7 +25,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -// The implementation is based on this blog post by Stephen Toub: +// The implementation of this class is inspired by this blog post from Stephen Toub: // https://blogs.msdn.microsoft.com/pfxteam/2012/02/11/building-async-coordination-primitives-part-2-asyncautoresetevent/ namespace Gremlin.Net.Driver @@ -42,25 +42,19 @@ public class AsyncAutoResetEvent /// /// Asynchronously waits for this event to be set or until a timeout occurs. /// - /// A that that represents the number of milliseconds to wait. + /// A that represents the number of milliseconds to wait. /// true if the current instance received a signal before timing out; otherwise, false. public async Task WaitOneAsync(TimeSpan timeout) { var tcs = new TaskCompletionSource(); var waitTask = WaitForSignalAsync(tcs); - if (waitTask.IsCompleted) return true; + if (waitTask.IsCompleted) return waitTask.Result; await Task.WhenAny(waitTask, Task.Delay(timeout)).ConfigureAwait(false); - lock (_waitingTasks) - { - if (!waitTask.IsCompleted) - { - // The wait timed out, so we need to remove the waiting task. - _waitingTasks.Remove(tcs); - tcs.SetResult(false); - } - } + if (waitTask.IsCompleted) return waitTask.Result; + StopWaiting(tcs); + return waitTask.Result; } @@ -77,6 +71,15 @@ private Task WaitForSignalAsync(TaskCompletionSource tcs) } return tcs.Task; } + + private void StopWaiting(TaskCompletionSource tcs) + { + lock (_waitingTasks) + { + _waitingTasks.Remove(tcs); + tcs.SetResult(false); + } + } /// /// Sets the event.