Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/Elasticsearch.Net.VirtualizedCluster/VirtualCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ public class VirtualCluster
{
private readonly List<Node> _nodes;

public VirtualCluster(IEnumerable<Node> nodes, bool productCheckSucceeds = true)
public VirtualCluster(IEnumerable<Node> nodes) : this(nodes, true) { }

public VirtualCluster(IEnumerable<Node> nodes, bool productCheckSucceeds)
{
_nodes = nodes.ToList();

Expand All @@ -29,19 +31,20 @@ public VirtualCluster(IEnumerable<Node> nodes, bool productCheckSucceeds = true)
public IReadOnlyList<Node> Nodes => _nodes;

public List<IRule> PingingRules { get; } = new();
public List<ISniffRule> SniffingRules { get; } = new();
public List<IRule> ProductCheckRules { get; } = new();
public List<ISniffRule> SniffingRules { get; } = new();
internal string ElasticsearchVersion { get; private set; } = "7.0.0";

internal string PublishAddressOverride { get; private set; }

internal bool SniffShouldReturnFqnd { get; private set; }
internal string ElasticsearchVersion { get; private set; } = "7.0.0";

public VirtualCluster SniffShouldReturnFqdn()
{
SniffShouldReturnFqnd = true;
return this;
}

public VirtualCluster SniffElasticsearchVersionNumber(string version)
{
ElasticsearchVersion = version;
Expand Down Expand Up @@ -93,7 +96,7 @@ public VirtualCluster Sniff(Func<SniffRule, ISniffRule> selector)
SniffingRules.Add(selector(new SniffRule()));
return this;
}

public VirtualCluster ProductCheck(Func<ProductCheckRule, IRule> selector)
{
ProductCheckRules.Add(selector(new ProductCheckRule()));
Expand Down
17 changes: 12 additions & 5 deletions src/Elasticsearch.Net.VirtualizedCluster/VirtualClusterWith.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ namespace Elasticsearch.Net.VirtualizedCluster
{
public static class VirtualClusterWith
{
public static VirtualCluster Nodes(int numberOfNodes, int startFrom = 9200, bool productCheckAlwaysSucceeds = true) =>
new (Enumerable.Range(startFrom, numberOfNodes).Select(n => new Node(new Uri($"http://localhost:{n}"))), productCheckAlwaysSucceeds);
public static VirtualCluster Nodes(int numberOfNodes, int startFrom = 9200) => Nodes(numberOfNodes, true, startFrom);

public static VirtualCluster MasterOnlyNodes(int numberOfNodes, int startFrom = 9200, bool productCheckSucceeds = true) =>
new (Enumerable.Range(startFrom, numberOfNodes).Select(n => new Node(new Uri($"http://localhost:{n}")) { HoldsData = false, MasterEligible = true }), productCheckSucceeds);
public static VirtualCluster Nodes(int numberOfNodes, bool productCheckSucceeds, int startFrom = 9200) =>
new(Enumerable.Range(startFrom, numberOfNodes).Select(n => new Node(new Uri($"http://localhost:{n}"))), productCheckSucceeds);

public static VirtualCluster Nodes(IEnumerable<Node> nodes, bool productCheckSucceeds = true) => new (nodes, productCheckSucceeds);
public static VirtualCluster MasterOnlyNodes(int numberOfNodes, int startFrom = 9200) => MasterOnlyNodes(numberOfNodes, true, startFrom);

public static VirtualCluster MasterOnlyNodes(int numberOfNodes, bool productCheckSucceeds, int startFrom = 9200) =>
new(Enumerable.Range(startFrom, numberOfNodes)
.Select(n => new Node(new Uri($"http://localhost:{n}")) { HoldsData = false, MasterEligible = true }), productCheckSucceeds);

public static VirtualCluster Nodes(IEnumerable<Node> nodes) => Nodes(nodes, true);

public static VirtualCluster Nodes(IEnumerable<Node> nodes, bool productCheckSucceeds) => new(nodes, productCheckSucceeds);
}
}
28 changes: 13 additions & 15 deletions src/Elasticsearch.Net/Connection/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,22 @@ internal class WebProxy : IWebProxy
/// <summary> The default IConnection implementation. Uses <see cref="HttpClient" />.</summary>
public class HttpConnection : IConnection
{
private static DiagnosticSource DiagnosticSource { get; } = new DiagnosticListener(DiagnosticSources.HttpConnection.SourceName);

private static readonly string MissingConnectionLimitMethodError =
$"Your target platform does not support {nameof(ConnectionConfiguration.ConnectionLimit)}"
+ $" please set {nameof(ConnectionConfiguration.ConnectionLimit)} to -1 on your connection configuration/settings."
+ $" this will cause the {nameof(HttpClientHandler.MaxConnectionsPerServer)} not to be set on {nameof(HttpClientHandler)}";

private RequestDataHttpClientFactory HttpClientFactory { get; }

[Obsolete("HttpConnection now uses a HttpClientFactory implementation to manage HttpClient and HttpMessageHandler instances. "
+ "This property is no longer used and will be removed in the next major release")]
protected readonly ConcurrentDictionary<int, HttpClient> Clients = new ConcurrentDictionary<int, HttpClient>();

public HttpConnection() => HttpClientFactory = new RequestDataHttpClientFactory(r => CreateHttpClientHandler(r));

public int InUseHandlers => HttpClientFactory.InUseHandlers;
public int RemovedHandlers => HttpClientFactory.RemovedHandlers;
private static DiagnosticSource DiagnosticSource { get; } = new DiagnosticListener(DiagnosticSources.HttpConnection.SourceName);

public HttpConnection() => HttpClientFactory = new RequestDataHttpClientFactory(r => CreateHttpClientHandler(r));
private RequestDataHttpClientFactory HttpClientFactory { get; }

public virtual TResponse Request<TResponse>(RequestData requestData)
where TResponse : class, IElasticsearchResponse, new()
Expand All @@ -80,7 +79,7 @@ public virtual TResponse Request<TResponse>(RequestData requestData)
if (requestData.PostData != null)
SetContent(requestMessage, requestData);

using(requestMessage?.Content ?? (IDisposable)Stream.Null)
using (requestMessage?.Content ?? (IDisposable)Stream.Null)
using (var d = DiagnosticSource.Diagnose<RequestData, int?>(DiagnosticSources.HttpConnection.SendAndReceiveHeaders, requestData))
{
if (requestData.TcpStats)
Expand Down Expand Up @@ -113,10 +112,11 @@ public virtual TResponse Request<TResponse>(RequestData requestData)
{
ex = e;
}
using(receive)
using (receive)
using (responseStream ??= Stream.Null)
{
var response = ResponseBuilder.ToResponse<TResponse>(requestData, ex, statusCode, warnings, responseStream, mimeType, productNames?.FirstOrDefault());
var response = ResponseBuilder.ToResponse<TResponse>(requestData, ex, statusCode, warnings, responseStream,
productNames?.FirstOrDefault(), mimeType);

// set TCP and threadpool stats on the response here so that in the event the request fails after the point of
// gathering stats, they are still exposed on the call details. Ideally these would be set inside ResponseBuilder.ToResponse,
Expand Down Expand Up @@ -150,7 +150,7 @@ public virtual async Task<TResponse> RequestAsync<TResponse>(RequestData request
if (requestData.PostData != null)
await SetContentAsync(requestMessage, requestData, cancellationToken).ConfigureAwait(false);

using(requestMessage?.Content ?? (IDisposable)Stream.Null)
using (requestMessage?.Content ?? (IDisposable)Stream.Null)
using (var d = DiagnosticSource.Diagnose<RequestData, int?>(DiagnosticSources.HttpConnection.SendAndReceiveHeaders, requestData))
{
if (requestData.TcpStats)
Expand All @@ -159,7 +159,8 @@ public virtual async Task<TResponse> RequestAsync<TResponse>(RequestData request
if (requestData.ThreadPoolStats)
threadPoolStats = ThreadPoolStats.GetStats();

responseMessage = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
responseMessage = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken)
.ConfigureAwait(false);
statusCode = (int)responseMessage.StatusCode;
d.EndState = statusCode;
}
Expand Down Expand Up @@ -187,7 +188,7 @@ public virtual async Task<TResponse> RequestAsync<TResponse>(RequestData request
using (responseStream ??= Stream.Null)
{
var response = await ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, warnings, responseStream, mimeType, productNames?.FirstOrDefault(), cancellationToken)
(requestData, ex, statusCode, warnings, responseStream, productNames?.FirstOrDefault(), mimeType, cancellationToken)
.ConfigureAwait(false);

// set TCP and thread pool stats on the response here so that in the event the request fails after the point of
Expand All @@ -209,7 +210,6 @@ protected virtual HttpMessageHandler CreateHttpClientHandler(RequestData request

// same limit as desktop clr
if (requestData.ConnectionSettings.ConnectionLimit > 0)
{
try
{
handler.MaxConnectionsPerServer = requestData.ConnectionSettings.ConnectionLimit;
Expand All @@ -222,7 +222,6 @@ protected virtual HttpMessageHandler CreateHttpClientHandler(RequestData request
{
throw new Exception(MissingConnectionLimitMethodError, e);
}
}

if (!requestData.ProxyAddress.IsNullOrEmpty())
{
Expand Down Expand Up @@ -291,7 +290,6 @@ protected virtual bool SetApiKeyAuthenticationIfNeeded(HttpRequestMessage reques

requestMessage.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", apiKey);
return true;

}

// TODO - make private in 8.0 and only expose SetAuthenticationIfNeeded
Expand Down Expand Up @@ -401,7 +399,7 @@ private static async Task SetContentAsync(HttpRequestMessage message, RequestDat
await stream.DisposeAsync().ConfigureAwait(false);

#else
stream.Dispose();
stream.Dispose();
#endif
}
else
Expand Down
18 changes: 3 additions & 15 deletions src/Elasticsearch.Net/Connection/HttpWebRequestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,12 @@ public virtual TResponse Request<TResponse>(RequestData requestData)
var data = requestData.PostData;

if (data != null)
{
using (var stream = request.GetRequestStream())
{
if (requestData.HttpCompression)
{
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
data.Write(zipStream, requestData.ConnectionSettings);
}
else
data.Write(stream, requestData.ConnectionSettings);
}
}
requestData.MadeItToResponse = true;

if (requestData.TcpStats)
Expand Down Expand Up @@ -94,8 +88,8 @@ public virtual TResponse Request<TResponse>(RequestData requestData)
}

responseStream ??= Stream.Null;
var response = ResponseBuilder.ToResponse<TResponse>(requestData, ex, statusCode, warnings, responseStream, mimeType,
productNames?.FirstOrDefault());
var response = ResponseBuilder.ToResponse<TResponse>(requestData, ex, statusCode, warnings, responseStream,
productNames?.FirstOrDefault(), mimeType);

// set TCP and threadpool stats on the response here so that in the event the request fails after the point of
// gathering stats, they are still exposed on the call details. Ideally these would be set inside ResponseBuilder.ToResponse,
Expand Down Expand Up @@ -134,15 +128,11 @@ CancellationToken cancellationToken
unregisterWaitHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);

using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
{
if (requestData.HttpCompression)
{
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
else
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
unregisterWaitHandle?.Invoke();
}
requestData.MadeItToResponse = true;
Expand Down Expand Up @@ -183,7 +173,7 @@ CancellationToken cancellationToken
}
responseStream ??= Stream.Null;
var response = await ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, warnings, responseStream, mimeType, productNames?.FirstOrDefault(), cancellationToken)
(requestData, ex, statusCode, warnings, responseStream, productNames?.FirstOrDefault(), mimeType, cancellationToken)
.ConfigureAwait(false);

// set TCP and thread pool stats on the response here so that in the event the request fails after the point of
Expand Down Expand Up @@ -329,10 +319,8 @@ protected virtual void SetBasicAuthenticationIfNeeded(HttpWebRequest request, Re
if (!string.IsNullOrEmpty(requestData.Uri.UserInfo))
userInfo = Uri.UnescapeDataString(requestData.Uri.UserInfo);
else if (requestData.BasicAuthorizationCredentials != null)
{
userInfo =
$"{requestData.BasicAuthorizationCredentials.Username}:{requestData.BasicAuthorizationCredentials.Password.CreateString()}";
}

if (string.IsNullOrWhiteSpace(userInfo))
return;
Expand Down
Loading