Skip to content

Limit the number of TCP connections on .NET core #2473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/scripts/Testing.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ module Tests =
setProcessEnvironVar "NEST_INTEGRATION_CLUSTER" clusterFilter
setProcessEnvironVar "NEST_INTEGRATION_VERSION" esVersion
setProcessEnvironVar "NEST_TEST_FILTER" testFilter
testDesktopClr "all"
testDesktopClr "all"
88 changes: 62 additions & 26 deletions src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ConnectionConfiguration : ConnectionConfiguration<ConnectionConfigu
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1);
public static readonly TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2);
public static readonly TimeSpan DefaultPingTimeoutOnSSL = TimeSpan.FromSeconds(5);
public static readonly int DefaultConnectionLimit = 80;

/// <summary>
/// ConnectionConfiguration allows you to control how ElasticLowLevelClient behaves and where/how it connects
Expand Down Expand Up @@ -53,10 +54,10 @@ public ConnectionConfiguration(IConnectionPool connectionPool, Func<ConnectionCo

// ReSharper disable once MemberCanBePrivate.Global
// eventhough we use don't use this we very much would like to expose this constructor

public ConnectionConfiguration(IConnectionPool connectionPool, IConnection connection, Func<ConnectionConfiguration, IElasticsearchSerializer> serializerFactory)
: base(connectionPool, connection, serializerFactory)
{ }

}

[Browsable(false)]
Expand Down Expand Up @@ -112,6 +113,9 @@ public abstract class ConnectionConfiguration<T> : IConnectionConfigurationValue
private int? _maxRetries;
int? IConnectionConfigurationValues.MaxRetries => _maxRetries;

private int _connectionLimit;
int IConnectionConfigurationValues.ConnectionLimit => _connectionLimit;

private bool _sniffOnStartup;
bool IConnectionConfigurationValues.SniffsOnStartup => _sniffOnStartup;

Expand Down Expand Up @@ -166,46 +170,75 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co
// ReSharper disable once VirtualMemberCallInContructor
this._serializer = serializerFactory?.Invoke((T)this) ?? this.DefaultSerializer((T)this);

this._connectionLimit = ConnectionConfiguration.DefaultConnectionLimit;
this._requestTimeout = ConnectionConfiguration.DefaultTimeout;
this._sniffOnConnectionFault = true;
this._sniffOnStartup = true;
this._sniffLifeSpan = TimeSpan.FromHours(1);
}

T Assign(Action<ConnectionConfiguration<T>> assigner) => Fluent.Assign((T)this, assigner);
private T Assign(Action<ConnectionConfiguration<T>> assigner) => Fluent.Assign((T)this, assigner);

/// <summary>
/// The default serializer used to serialize documents to and from JSON
/// </summary>
protected virtual IElasticsearchSerializer DefaultSerializer(T settings) => new ElasticsearchDefaultSerializer();

/// <summary>
/// Sets the keep-alive option on a TCP connection.
/// <para>For Desktop CLR, sets ServicePointManager.SetTcpKeepAlive</para>
/// </summary>
/// <param name="keepAliveTime">Specifies the timeout with no activity until the first keep-alive packet is sent.</param>
/// <param name="keepAliveInterval">Specifies the interval between when successive keep-alive packets are sent if no acknowledgement is received.</param>
public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval) =>
Assign(a => { this._keepAliveTime = keepAliveTime; this._keepAliveInterval = keepAliveInterval; });

/// <summary>
/// The maximum number of retries for a given request,
/// </summary>
public T MaximumRetries(int maxRetries) => Assign(a => a._maxRetries = maxRetries);

/// <summary>
/// Limits the number of concurrent connections that can be opened to an endpoint. Defaults to 80.
/// <para>For Desktop CLR, this setting applies to the DefaultConnectionLimit property on the ServicePointManager object when creating ServicePoint objects, affecting the default <see cref="IConnection"/> implementation.</para>
/// <para>For Core CLR, this setting applies to the MaxConnectionsPerServer property on the HttpClientHandler instances used by the HttpClient inside the default <see cref="IConnection"/> implementation</para>
/// </summary>
/// <param name="connectionLimit">The connection limit</param>
public T ConnectionLimit(int connectionLimit)
{
if (connectionLimit <= 0) throw new ArgumentException("must be greater than 0", nameof(connectionLimit));
return Assign(a => a._connectionLimit = connectionLimit);
}

/// <summary>
/// On connection pools that support reseeding setting this to true (default) will resniff the cluster when a call fails
/// Enables resniffing of the cluster when a call fails, if the connection pool supports reseeding. Defaults to true
/// </summary>
public T SniffOnConnectionFault(bool sniffsOnConnectionFault = true) => Assign(a => a._sniffOnConnectionFault = sniffsOnConnectionFault);

/// <summary>
/// Enables sniffing on first usage of a connection pool if that pool supports reseeding, defaults to true
/// Enables sniffing on first usage of a connection pool if that pool supports reseeding. Defaults to true
/// </summary>
public T SniffOnStartup(bool sniffsOnStartup = true) => Assign(a => a._sniffOnStartup = sniffsOnStartup);

/// <summary>
/// Set the duration after which a cluster state is considered stale and a sniff should be performed again.
/// An IConnectionPool has to signal it supports reseeding otherwise sniffing will never happen.
/// An <see cref="IConnectionPool"/> has to signal it supports reseeding, otherwise sniffing will never happen.
/// Defaults to 1 hour.
/// Set to null to disable completely. Sniffing will only ever happen on ConnectionPools that return true for SupportsReseeding
/// </summary>
/// <param name="sniffLifeSpan">The duration a clusterstate is considered fresh, set to null to disable periodic sniffing</param>
public T SniffLifeSpan(TimeSpan? sniffLifeSpan) => Assign(a => a._sniffLifeSpan = sniffLifeSpan);

/// <summary>
/// Enable gzip compressed requests and responses, do note that you need to configure elasticsearch to set this
/// Enables gzip compressed requests and responses.
/// <para>IMPORTANT: You need to configure http compression on Elasticsearch to be able to use this</para>
/// <para>http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-http.html"</para>
/// </summary>
public T EnableHttpCompression(bool enabled = true) => Assign(a => a._enableHttpCompression = enabled);

/// <summary>
/// Disables the automatic detection of a proxy
/// </summary>
public T DisableAutomaticProxyDetection(bool disable = true) => Assign(a => a._disableAutomaticProxyDetection = disable);

/// <summary>
Expand All @@ -223,18 +256,18 @@ public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval)
public T DisablePing(bool disable = true) => Assign(a => a._disablePings = disable);

/// <summary>
/// This NameValueCollection will be appended to every url NEST calls, great if you need to pass i.e an API key.
/// A collection of query string parameters that will be sent with every request. Useful in situations where you always need to pass a parameter e.g. an API key.
/// </summary>
public T GlobalQueryStringParameters(NameValueCollection queryStringParameters) => Assign(a => a._queryString.Add(queryStringParameters));

/// <summary>
/// a NameValueCollection that will be send as headers for each request
/// A collection of headers that will be sent with every request. Useful in situations where you always need to pass a header e.g. a custom auth header
/// </summary>
public T GlobalHeaders(NameValueCollection headers) => Assign(a => a._headers.Add(headers));

/// <summary>
/// Sets the default timeout in milliseconds for each request to Elasticsearch.
/// NOTE: You can set this to a high value here, and specify the timeout on Elasticsearch's side.
/// NOTE: You can set this to a high value here, and specify a timeout on Elasticsearch's side.
/// </summary>
/// <param name="timeout">time out in milliseconds</param>
public T RequestTimeout(TimeSpan timeout) => Assign(a => a._requestTimeout = timeout);
Expand Down Expand Up @@ -262,14 +295,14 @@ public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval)

/// <summary>
/// Limits the total runtime including retries separately from <see cref="RequestTimeout"/>
/// <pre>
/// When not specified defaults to <see cref="RequestTimeout"/> which itself defaults to 60seconds
/// </pre>
/// <para>
/// When not specified defaults to <see cref="RequestTimeout"/>, which itself defaults to 60 seconds
/// </para>
/// </summary>
public T MaxRetryTimeout(TimeSpan maxRetryTimeout) => Assign(a => a._maxRetryTimeout = maxRetryTimeout);

/// <summary>
/// If your connection has to go through proxy use this method to specify the proxy url
/// If your connection has to go through proxy, use this method to specify the proxy url
/// </summary>
public T Proxy(Uri proxyAdress, string username, string password)
{
Expand All @@ -281,8 +314,8 @@ public T Proxy(Uri proxyAdress, string username, string password)
}

/// <summary>
/// Forces all requests to have ?pretty=true, causing elasticsearch to return formatted json.
/// Also forces the client to send out formatted json. Defaults to false
/// Forces all requests to have ?pretty=true, causing Elasticsearch to return formatted json.
/// Also forces the client to send out formatted json. Defaults to <c>false</c>
/// </summary>
public T PrettyJson(bool b = true) => Assign(a =>
{
Expand All @@ -293,33 +326,36 @@ public T PrettyJson(bool b = true) => Assign(a =>
});

/// <summary>
/// Make sure the reponse bytes are always available on the ElasticsearchResponse object
/// <para>Note: that depending on the registered serializer this may cause the respond to be read in memory first</para>
/// Ensures the response bytes are always available on the <see cref="ElasticsearchResponse{T}"/>
/// <para>IMPORTANT: Depending on the registered serializer,
/// this may cause the respose to be buffered in memory first, potentially affecting performance.</para>
/// </summary>
public T DisableDirectStreaming(bool b = true) => Assign(a => a._disableDirectStreaming = b);

/// <summary>
/// Global callback for every response that NEST receives, useful for custom logging.
/// Calling this multiple times will register multiple listeners.
/// Registers an <see cref="Action{IApiCallDetails}"/> that is called when a response is received from Elasticsearch.
/// This can be useful for implementing custom logging.
/// Multiple callbacks can be registered by calling this multiple times
/// </summary>
public T OnRequestCompleted(Action<IApiCallDetails> handler) =>
Assign(a => a._completedRequestHandler += handler ?? DefaultCompletedRequestHandler);

/// <summary>
/// Registers an <see cref="Action{RequestData}"/> that is called when <see cref="RequestData"/> is created.
/// Multiple callbacks can be registered by calling this multiple times
/// </summary>
public T OnRequestDataCreated(Action<RequestData> handler) =>
Assign(a => a._onRequestDataCreated += handler ?? DefaultRequestDataCreated);

/// <summary>
/// Basic access authentication credentials to specify with all requests.
/// Basic Authentication credentials to specify with all requests.
/// </summary>
public T BasicAuthentication(string userName, string password)
{
this._basicAuthCredentials = new BasicAuthenticationCredentials
public T BasicAuthentication(string userName, string password) =>
Assign(a => a._basicAuthCredentials = new BasicAuthenticationCredentials
{
Username = userName,
Password = password
};
return (T)this;
}
});

/// <summary>
/// Allows for requests to be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface IConnectionConfigurationValues : IDisposable
/// </summary>
int? MaxRetries { get; }

/// <summary>
/// Limits the number of concurrent connections that can be opened to an endpoint. Defaults to 80 (see <see cref="ConnectionConfiguration.DefaultConnectionLimit"/>).
/// <para>For Desktop CLR, this setting applies to the DefaultConnectionLimit property on the ServicePointManager object when creating ServicePoint objects, affecting the default <see cref="IConnection"/> implementation.</para>
/// <para>For Core CLR, this setting applies to the MaxConnectionsPerServer property on the HttpClientHandler instances used by the HttpClient inside the default <see cref="IConnection"/> implementation</para>
/// </summary>
int ConnectionLimit { get; }

/// <summary>
/// This signals that we do not want to send initial pings to unknown/previously dead nodes
/// and just send the call straightaway
Expand All @@ -68,7 +75,15 @@ public interface IConnectionConfigurationValues : IDisposable
/// When set will force all connections through this proxy
/// </summary>
string ProxyAddress { get; }

/// <summary>
/// The username for the proxy, when configured
/// </summary>
string ProxyUsername { get; }

/// <summary>
/// The password for the proxy, when configured
/// </summary>
string ProxyPassword { get; }

/// <summary>
Expand Down Expand Up @@ -138,6 +153,10 @@ public interface IConnectionConfigurationValues : IDisposable
/// </summary>
BasicAuthenticationCredentials BasicAuthenticationCredentials { get; }

/// <summary>
/// An action to run when the <see cref="RequestData"/> for a request has been
/// created.
/// </summary>
Action<RequestData> OnRequestDataCreated { get; }

/// <summary>
Expand Down
4 changes: 3 additions & 1 deletion src/Elasticsearch.Net/Connection/HttpConnection-CoreFx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ protected virtual HttpClientHandler CreateHttpClientHandler(RequestData requestD
{
var handler = new HttpClientHandler
{
AutomaticDecompression = requestData.HttpCompression ? GZip | Deflate : None
AutomaticDecompression = requestData.HttpCompression ? GZip | Deflate : None,
// same limit as desktop clr
MaxConnectionsPerServer = requestData.ConnectionSettings.ConnectionLimit
};

if (!requestData.ProxyAddress.IsNullOrEmpty())
Expand Down
2 changes: 1 addition & 1 deletion src/Elasticsearch.Net/Connection/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected virtual void AlterServicePoint(ServicePoint requestServicePoint, Reque
{
requestServicePoint.UseNagleAlgorithm = false;
requestServicePoint.Expect100Continue = false;
requestServicePoint.ConnectionLimit = 80;
requestServicePoint.ConnectionLimit = requestData.ConnectionSettings.ConnectionLimit;
//looking at http://referencesource.microsoft.com/#System/net/System/Net/ServicePoint.cs
//this method only sets internal values and wont actually cause timers and such to be reset
//So it should be idempotent if called with the same parameters
Expand Down
2 changes: 1 addition & 1 deletion src/Tests/Reproduce/ConnectionReuseAndBalancing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ [I] public async Task IndexAndSearchABunch()
this.AssertHttpStats(nodeStats);
for (var i = 0; i < 10; i++)
{
Parallel.For(0, 1000, async (c) => await client.SearchAsync<Project>(s => s));
Parallel.For(0, 1000, c => client.Search<Project>(s => s));

nodeStats = await client.NodesStatsAsync(statsRequest);
this.AssertHttpStats(nodeStats);
Expand Down