diff --git a/src/Connections/Elasticsearch.Net.Connection.HttpClient/Elasticsearch.Net.Connection.HttpClient.csproj b/src/Connections/Elasticsearch.Net.Connection.HttpClient/Elasticsearch.Net.Connection.HttpClient.csproj
index 7dfbe58a14a..ac009575f9f 100644
--- a/src/Connections/Elasticsearch.Net.Connection.HttpClient/Elasticsearch.Net.Connection.HttpClient.csproj
+++ b/src/Connections/Elasticsearch.Net.Connection.HttpClient/Elasticsearch.Net.Connection.HttpClient.csproj
@@ -74,4 +74,4 @@
-->
-
\ No newline at end of file
+
diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs
index 1d6fbe935db..07dd84c062f 100644
--- a/src/Elasticsearch.Net/Connection/HttpConnection.cs
+++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs
@@ -1,365 +1,378 @@
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.IO.Compression;
-using System.Linq;
-using System.Net;
-using System.Runtime.InteropServices;
-using System.Security.Cryptography;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Elasticsearch.Net.Connection.Configuration;
-using Elasticsearch.Net.Providers;
-using PurifyNet;
-
-namespace Elasticsearch.Net.Connection
-{
- public class HttpConnection : IConnection
- {
- const int BUFFER_SIZE = 1024;
-
- protected IConnectionConfigurationValues ConnectionSettings { get; set; }
- private readonly Semaphore _resourceLock;
- private readonly bool _enableTrace;
-
- static HttpConnection()
- {
- ServicePointManager.UseNagleAlgorithm = false;
- ServicePointManager.Expect100Continue = false;
- ServicePointManager.DefaultConnectionLimit = 10000;
- }
-
- public HttpConnection(IConnectionConfigurationValues settings)
- {
- if (settings == null)
- throw new ArgumentNullException("settings");
-
- this.ConnectionSettings = settings;
- if (settings.MaximumAsyncConnections > 0)
- {
- var semaphore = Math.Max(1, settings.MaximumAsyncConnections);
- this._resourceLock = new Semaphore(semaphore, semaphore);
- }
- this._enableTrace = settings.TraceEnabled;
- }
-
- public virtual ElasticsearchResponse GetSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.HeaderOnlyRequest(uri, "GET", requestSpecificConfig);
- }
- public virtual ElasticsearchResponse HeadSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.HeaderOnlyRequest(uri, "HEAD", requestSpecificConfig);
- }
-
- public virtual ElasticsearchResponse PostSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.BodyRequest(uri, data, "POST", requestSpecificConfig);
- }
- public virtual ElasticsearchResponse PutSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.BodyRequest(uri, data, "PUT", requestSpecificConfig);
- }
- public virtual ElasticsearchResponse DeleteSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.HeaderOnlyRequest(uri, "DELETE", requestSpecificConfig);
- }
- public virtual ElasticsearchResponse DeleteSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- return this.BodyRequest(uri, data, "DELETE", requestSpecificConfig);
- }
-
-
- private ElasticsearchResponse HeaderOnlyRequest(Uri uri, string method, IRequestConfiguration requestSpecificConfig)
- {
- var r = this.CreateHttpWebRequest(uri, method, null, requestSpecificConfig);
- return this.DoSynchronousRequest(r, requestSpecificConfig: requestSpecificConfig);
- }
-
- private ElasticsearchResponse BodyRequest(Uri uri, byte[] data, string method, IRequestConfiguration requestSpecificConfig)
- {
- var r = this.CreateHttpWebRequest(uri, method, data, requestSpecificConfig);
- return this.DoSynchronousRequest(r, data, requestSpecificConfig);
- }
-
- public virtual Task> Get(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "GET", null, requestSpecificConfig);
- return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
- }
- public virtual Task> Head(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "HEAD", null, requestSpecificConfig);
- return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
- }
- public virtual Task> Post(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "POST", data, requestSpecificConfig);
- return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
- }
-
- public virtual Task> Put(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "PUT", data, requestSpecificConfig);
- return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
- }
-
- public virtual Task> Delete(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "DELETE", data, requestSpecificConfig);
- return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
- }
- public virtual Task> Delete(Uri uri, IRequestConfiguration requestSpecificConfig = null)
- {
- var r = this.CreateHttpWebRequest(uri, "DELETE", null, requestSpecificConfig);
- return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
- }
-
- private static void ThreadTimeoutCallback(object state, bool timedOut)
- {
- if (timedOut)
- {
- HttpWebRequest request = state as HttpWebRequest;
- if (request != null)
- {
- request.Abort();
- }
- }
- }
-
-
- protected virtual HttpWebRequest CreateHttpWebRequest(Uri uri, string method, byte[] data, IRequestConfiguration requestSpecificConfig)
- {
- var myReq = this.CreateWebRequest(uri, method, data, requestSpecificConfig);
- this.SetBasicAuthorizationIfNeeded(myReq);
- this.SetProxyIfNeeded(myReq);
- return myReq;
- }
-
- private void SetProxyIfNeeded(HttpWebRequest myReq)
- {
- if (!string.IsNullOrEmpty(this.ConnectionSettings.ProxyAddress))
- {
- var proxy = new WebProxy();
- var uri = new Uri(this.ConnectionSettings.ProxyAddress);
- var credentials = new NetworkCredential(this.ConnectionSettings.ProxyUsername, this.ConnectionSettings.ProxyPassword);
- proxy.Address = uri;
- proxy.Credentials = credentials;
- myReq.Proxy = proxy;
- }
- if(!this.ConnectionSettings.AutomaticProxyDetection)
- {
- myReq.Proxy = null;
- }
- }
-
- private void SetBasicAuthorizationIfNeeded(HttpWebRequest myReq)
- {
- //TODO figure out a way to cache this;
-
- //if (this._ConnectionSettings.UriSpecifiedBasicAuth)
- //{
- myReq.Headers["Authorization"] =
- "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes(myReq.RequestUri.UserInfo));
- //}
- }
-
- protected virtual HttpWebRequest CreateWebRequest(Uri uri, string method, byte[] data, IRequestConfiguration requestSpecificConfig)
- {
- //TODO append global querystring
- //var url = this._CreateUriString(path);
-
- var myReq = (HttpWebRequest)WebRequest.Create(uri);
- myReq.Accept = "application/json";
- myReq.ContentType = "application/json";
- if (this.ConnectionSettings.EnableCompressedResponses)
- {
- myReq.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
- myReq.Headers.Add("Accept-Encoding", "gzip,deflate");
- }
- if (requestSpecificConfig != null && !string.IsNullOrWhiteSpace(requestSpecificConfig.ContentType))
- {
- myReq.Accept = requestSpecificConfig.ContentType;
- myReq.ContentType = requestSpecificConfig.ContentType;
- }
- var timeout = GetRequestTimeout(requestSpecificConfig);
- myReq.Timeout = timeout;
- myReq.ReadWriteTimeout = timeout;
- myReq.Method = method;
-
- //WebRequest won't send Content-Length: 0 for empty bodies
- //which goes against RFC's and might break i.e IIS when used as a proxy.
- //see: https://github.com/elasticsearch/elasticsearch-net/issues/562
- var m = method.ToLowerInvariant();
- if (m != "head" && m != "get" && (data == null || data.Length == 0))
- myReq.ContentLength = 0;
-
- return myReq;
- }
-
- protected virtual ElasticsearchResponse DoSynchronousRequest(HttpWebRequest request, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
- {
- var path = request.RequestUri.ToString();
- var method = request.Method;
-
- if (data != null)
- {
- using (var r = request.GetRequestStream())
- {
- r.Write(data, 0, data.Length);
- }
- }
- try
- {
- //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
- //Either the stream or the response object needs to be closed but not both although it won't
- //throw any errors if both are closed atleast one of them has to be Closed.
- //Since we expose the stream we let closing the stream determining when to close the connection
- var response = (HttpWebResponse)request.GetResponse();
- var responseStream = response.GetResponseStream();
- return WebToElasticsearchResponse(data, responseStream, response, method, path);
- }
- catch (WebException webException)
- {
- return HandleWebException(data, webException, method, path);
- }
- }
-
- private ElasticsearchResponse HandleWebException(byte[] data, WebException webException, string method, string path)
- {
- ElasticsearchResponse cs = null;
- var httpEx = webException.Response as HttpWebResponse;
- if (httpEx != null)
- {
- cs = WebToElasticsearchResponse(data, httpEx.GetResponseStream(), httpEx, method, path);
- return cs;
- }
- cs = ElasticsearchResponse.CreateError(this.ConnectionSettings, webException, method, path, data);
- return cs;
- }
-
- private ElasticsearchResponse WebToElasticsearchResponse(byte[] data, Stream responseStream, HttpWebResponse response, string method, string path)
- {
- ElasticsearchResponse cs = ElasticsearchResponse.Create(this.ConnectionSettings, (int)response.StatusCode, method, path, data);
- cs.Response = responseStream;
- return cs;
- }
-
- protected virtual Task> DoAsyncRequest(HttpWebRequest request, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
- {
- var tcs = new TaskCompletionSource>();
- if (this.ConnectionSettings.MaximumAsyncConnections <= 0
- || this._resourceLock == null)
- return this.CreateIterateTask(request, data, requestSpecificConfig, tcs);
-
- var timeout = GetRequestTimeout(requestSpecificConfig);
- var path = request.RequestUri.ToString();
- var method = request.Method;
- if (!this._resourceLock.WaitOne(timeout))
- {
- var m = "Could not start the operation before the timeout of " + timeout +
- "ms completed while waiting for the semaphore";
- var cs = ElasticsearchResponse.CreateError(this.ConnectionSettings, new TimeoutException(m), method, path, data);
- tcs.SetResult(cs);
- return tcs.Task;
- }
- try
- {
- return this.CreateIterateTask(request, data, requestSpecificConfig, tcs);
- }
- finally
- {
- this._resourceLock.Release();
- }
- }
-
- private Task> CreateIterateTask(HttpWebRequest request, byte[] data, IRequestConfiguration requestSpecificConfig, TaskCompletionSource> tcs)
- {
- this.Iterate(request, data, this._AsyncSteps(request, tcs, data, requestSpecificConfig), tcs);
- return tcs.Task;
- }
-
- private IEnumerable _AsyncSteps(HttpWebRequest request, TaskCompletionSource> tcs, byte[] data, IRequestConfiguration requestSpecificConfig)
- {
- var timeout = GetRequestTimeout(requestSpecificConfig);
-
- if (data != null)
- {
- var getRequestStream = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
- ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
- yield return getRequestStream;
-
- var requestStream = getRequestStream.Result;
- try
- {
- var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, data, 0, data.Length, null);
- yield return writeToRequestStream;
- }
- finally
- {
- requestStream.Close();
- }
- }
-
- // Get the response
- var getResponse = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
- ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
- yield return getResponse;
-
- var path = request.RequestUri.ToString();
- var method = request.Method;
-
- //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
- //Either the stream or the response object needs to be closed but not both (although it won't)
- //throw any errors if both are closed atleast one of them has to be Closed.
- //Since we expose the stream we let closing the stream determining when to close the connection
- var response = (HttpWebResponse)getResponse.Result;
- var responseStream = response.GetResponseStream();
- var cs = ElasticsearchResponse.Create(this.ConnectionSettings, (int)response.StatusCode, method, path, data);
- cs.Response = responseStream;
- tcs.TrySetResult(cs);
- }
-
- private void Iterate(HttpWebRequest request, byte[] data, IEnumerable asyncIterator, TaskCompletionSource> tcs)
- {
- var enumerator = asyncIterator.GetEnumerator();
- Action recursiveBody = null;
- recursiveBody = completedTask =>
- {
- if (completedTask != null && completedTask.IsFaulted)
- {
- //none of the individual steps in _AsyncSteps run in parallel for 1 request
- //as this would be impossible we can assume Aggregate Exception.InnerException
- var exception = completedTask.Exception.InnerException;
-
- //cleanly exit from exceptions in stages if the exception is a webexception
- if (exception is WebException)
- {
- var path = request.RequestUri.ToString();
- var method = request.Method;
- var response = this.HandleWebException(data, exception as WebException, method, path);
- tcs.SetResult(response);
- }
- else
- tcs.TrySetException(exception);
- enumerator.Dispose();
- }
- else if (enumerator.MoveNext())
- {
- enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
- }
- else enumerator.Dispose();
- };
- recursiveBody(null);
- }
-
- private int GetRequestTimeout(IRequestConfiguration requestConfiguration)
- {
- if (requestConfiguration != null && requestConfiguration.ConnectTimeout.HasValue)
- return requestConfiguration.RequestTimeout.Value;
-
- return this.ConnectionSettings.Timeout;
- }
- }
-}
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Net;
+using System.Runtime.InteropServices;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Elasticsearch.Net.Connection.Configuration;
+using Elasticsearch.Net.Providers;
+using PurifyNet;
+
+namespace Elasticsearch.Net.Connection
+{
+ public class HttpConnection : IConnection
+ {
+ const int BUFFER_SIZE = 1024;
+
+ protected IConnectionConfigurationValues ConnectionSettings { get; set; }
+ private readonly Semaphore _resourceLock;
+ private readonly bool _enableTrace;
+
+ static HttpConnection()
+ {
+ ServicePointManager.UseNagleAlgorithm = false;
+ ServicePointManager.Expect100Continue = false;
+ ServicePointManager.DefaultConnectionLimit = 10000;
+ //ServicePointManager.SetTcpKeepAlive(true, 2000, 2000);
+
+ //WebException's GetResponse is limitted to 65kb by default.
+ //Elasticsearch can be alot more chatty then that when dumping exceptions
+ //On error responses, so lets up the ante.
+
+ //Not available under mono
+ if (Type.GetType ("Mono.Runtime") == null)
+ HttpWebRequest.DefaultMaximumErrorResponseLength = -1;
+ }
+
+ public HttpConnection(IConnectionConfigurationValues settings)
+ {
+ if (settings == null)
+ throw new ArgumentNullException("settings");
+
+ this.ConnectionSettings = settings;
+ if (settings.MaximumAsyncConnections > 0)
+ {
+ var semaphore = Math.Max(1, settings.MaximumAsyncConnections);
+ this._resourceLock = new Semaphore(semaphore, semaphore);
+ }
+ this._enableTrace = settings.TraceEnabled;
+ }
+
+ public virtual ElasticsearchResponse GetSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.HeaderOnlyRequest(uri, "GET", requestSpecificConfig);
+ }
+ public virtual ElasticsearchResponse HeadSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.HeaderOnlyRequest(uri, "HEAD", requestSpecificConfig);
+ }
+
+ public virtual ElasticsearchResponse PostSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.BodyRequest(uri, data, "POST", requestSpecificConfig);
+ }
+ public virtual ElasticsearchResponse PutSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.BodyRequest(uri, data, "PUT", requestSpecificConfig);
+ }
+ public virtual ElasticsearchResponse DeleteSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.HeaderOnlyRequest(uri, "DELETE", requestSpecificConfig);
+ }
+ public virtual ElasticsearchResponse DeleteSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ return this.BodyRequest(uri, data, "DELETE", requestSpecificConfig);
+ }
+
+
+ private ElasticsearchResponse HeaderOnlyRequest(Uri uri, string method, IRequestConfiguration requestSpecificConfig)
+ {
+ var r = this.CreateHttpWebRequest(uri, method, null, requestSpecificConfig);
+ return this.DoSynchronousRequest(r, requestSpecificConfig: requestSpecificConfig);
+ }
+
+ private ElasticsearchResponse BodyRequest(Uri uri, byte[] data, string method, IRequestConfiguration requestSpecificConfig)
+ {
+ var r = this.CreateHttpWebRequest(uri, method, data, requestSpecificConfig);
+ return this.DoSynchronousRequest(r, data, requestSpecificConfig);
+ }
+
+ public virtual Task> Get(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "GET", null, requestSpecificConfig);
+ return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
+ }
+ public virtual Task> Head(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "HEAD", null, requestSpecificConfig);
+ return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
+ }
+ public virtual Task> Post(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "POST", data, requestSpecificConfig);
+ return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
+ }
+
+ public virtual Task> Put(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "PUT", data, requestSpecificConfig);
+ return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
+ }
+
+ public virtual Task> Delete(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "DELETE", data, requestSpecificConfig);
+ return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
+ }
+ public virtual Task> Delete(Uri uri, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var r = this.CreateHttpWebRequest(uri, "DELETE", null, requestSpecificConfig);
+ return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
+ }
+
+ private static void ThreadTimeoutCallback(object state, bool timedOut)
+ {
+ if (timedOut)
+ {
+ HttpWebRequest request = state as HttpWebRequest;
+ if (request != null)
+ {
+ request.Abort();
+ }
+ }
+ }
+
+
+ protected virtual HttpWebRequest CreateHttpWebRequest(Uri uri, string method, byte[] data, IRequestConfiguration requestSpecificConfig)
+ {
+ var myReq = this.CreateWebRequest(uri, method, data, requestSpecificConfig);
+ this.SetBasicAuthorizationIfNeeded(myReq);
+ this.SetProxyIfNeeded(myReq);
+ return myReq;
+ }
+
+ private void SetProxyIfNeeded(HttpWebRequest myReq)
+ {
+ if (!string.IsNullOrEmpty(this.ConnectionSettings.ProxyAddress))
+ {
+ var proxy = new WebProxy();
+ var uri = new Uri(this.ConnectionSettings.ProxyAddress);
+ var credentials = new NetworkCredential(this.ConnectionSettings.ProxyUsername, this.ConnectionSettings.ProxyPassword);
+ proxy.Address = uri;
+ proxy.Credentials = credentials;
+ myReq.Proxy = proxy;
+ }
+ if(!this.ConnectionSettings.AutomaticProxyDetection)
+ {
+ myReq.Proxy = null;
+ }
+ }
+
+ private void SetBasicAuthorizationIfNeeded(HttpWebRequest myReq)
+ {
+ //TODO figure out a way to cache this;
+
+ //if (this._ConnectionSettings.UriSpecifiedBasicAuth)
+ //{
+ myReq.Headers["Authorization"] =
+ "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes(myReq.RequestUri.UserInfo));
+ //}
+ }
+
+ protected virtual HttpWebRequest CreateWebRequest(Uri uri, string method, byte[] data, IRequestConfiguration requestSpecificConfig)
+ {
+ //TODO append global querystring
+ //var url = this._CreateUriString(path);
+
+ var myReq = (HttpWebRequest)WebRequest.Create(uri);
+ myReq.Accept = "application/json";
+ myReq.ContentType = "application/json";
+ myReq.MaximumResponseHeadersLength = -1;
+ //myReq.AllowWriteStreamBuffering = false;
+ if (this.ConnectionSettings.EnableCompressedResponses)
+ {
+ myReq.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
+ myReq.Headers.Add("Accept-Encoding", "gzip,deflate");
+ }
+ if (requestSpecificConfig != null && !string.IsNullOrWhiteSpace(requestSpecificConfig.ContentType))
+ {
+ myReq.Accept = requestSpecificConfig.ContentType;
+ myReq.ContentType = requestSpecificConfig.ContentType;
+ }
+ var timeout = GetRequestTimeout(requestSpecificConfig);
+ myReq.Timeout = timeout;
+ myReq.ReadWriteTimeout = timeout;
+ myReq.Method = method;
+
+ //WebRequest won't send Content-Length: 0 for empty bodies
+ //which goes against RFC's and might break i.e IIS when used as a proxy.
+ //see: https://github.com/elasticsearch/elasticsearch-net/issues/562
+ var m = method.ToLowerInvariant();
+ if (m != "head" && m != "get" && (data == null || data.Length == 0))
+ myReq.ContentLength = 0;
+
+ return myReq;
+ }
+
+ protected virtual ElasticsearchResponse DoSynchronousRequest(HttpWebRequest request, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var path = request.RequestUri.ToString();
+ var method = request.Method;
+
+ if (data != null)
+ {
+ using (var r = request.GetRequestStream())
+ {
+ r.Write(data, 0, data.Length);
+ }
+ }
+ try
+ {
+ //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
+ //Either the stream or the response object needs to be closed but not both although it won't
+ //throw any errors if both are closed atleast one of them has to be Closed.
+ //Since we expose the stream we let closing the stream determining when to close the connection
+ var response = (HttpWebResponse)request.GetResponse();
+ var responseStream = response.GetResponseStream();
+ return WebToElasticsearchResponse(data, responseStream, response, method, path);
+ }
+ catch (WebException webException)
+ {
+ return HandleWebException(data, webException, method, path);
+ }
+ }
+
+ private ElasticsearchResponse HandleWebException(byte[] data, WebException webException, string method, string path)
+ {
+ ElasticsearchResponse cs = null;
+ var httpEx = webException.Response as HttpWebResponse;
+ if (httpEx != null)
+ {
+ //StreamReader ms = new StreamReader(httpEx.GetResponseStream());
+ //var response = ms.ReadToEnd();
+ cs = WebToElasticsearchResponse(data, httpEx.GetResponseStream(), httpEx, method, path);
+ return cs;
+ }
+ cs = ElasticsearchResponse.CreateError(this.ConnectionSettings, webException, method, path, data);
+ return cs;
+ }
+
+ private ElasticsearchResponse WebToElasticsearchResponse(byte[] data, Stream responseStream, HttpWebResponse response, string method, string path)
+ {
+ ElasticsearchResponse cs = ElasticsearchResponse.Create(this.ConnectionSettings, (int)response.StatusCode, method, path, data);
+ cs.Response = responseStream;
+ return cs;
+ }
+
+ protected virtual Task> DoAsyncRequest(HttpWebRequest request, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
+ {
+ var tcs = new TaskCompletionSource>();
+ if (this.ConnectionSettings.MaximumAsyncConnections <= 0
+ || this._resourceLock == null)
+ return this.CreateIterateTask(request, data, requestSpecificConfig, tcs);
+
+ var timeout = GetRequestTimeout(requestSpecificConfig);
+ var path = request.RequestUri.ToString();
+ var method = request.Method;
+ if (!this._resourceLock.WaitOne(timeout))
+ {
+ var m = "Could not start the operation before the timeout of " + timeout +
+ "ms completed while waiting for the semaphore";
+ var cs = ElasticsearchResponse.CreateError(this.ConnectionSettings, new TimeoutException(m), method, path, data);
+ tcs.SetResult(cs);
+ return tcs.Task;
+ }
+ try
+ {
+ return this.CreateIterateTask(request, data, requestSpecificConfig, tcs);
+ }
+ finally
+ {
+ this._resourceLock.Release();
+ }
+ }
+
+ private Task> CreateIterateTask(HttpWebRequest request, byte[] data, IRequestConfiguration requestSpecificConfig, TaskCompletionSource> tcs)
+ {
+ this.Iterate(request, data, this._AsyncSteps(request, tcs, data, requestSpecificConfig), tcs);
+ return tcs.Task;
+ }
+
+ private IEnumerable _AsyncSteps(HttpWebRequest request, TaskCompletionSource> tcs, byte[] data, IRequestConfiguration requestSpecificConfig)
+ {
+ var timeout = GetRequestTimeout(requestSpecificConfig);
+
+ if (data != null)
+ {
+ var getRequestStream = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
+ ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
+ yield return getRequestStream;
+
+ var requestStream = getRequestStream.Result;
+ try
+ {
+ var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, data, 0, data.Length, null);
+ yield return writeToRequestStream;
+ }
+ finally
+ {
+ requestStream.Close();
+ }
+ }
+
+ // Get the response
+ var getResponse = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
+ ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
+ yield return getResponse;
+
+ var path = request.RequestUri.ToString();
+ var method = request.Method;
+
+ //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
+ //Either the stream or the response object needs to be closed but not both (although it won't)
+ //throw any errors if both are closed atleast one of them has to be Closed.
+ //Since we expose the stream we let closing the stream determining when to close the connection
+ var response = (HttpWebResponse)getResponse.Result;
+ var responseStream = response.GetResponseStream();
+ var cs = ElasticsearchResponse.Create(this.ConnectionSettings, (int)response.StatusCode, method, path, data);
+ cs.Response = responseStream;
+ tcs.TrySetResult(cs);
+ }
+
+ private void Iterate(HttpWebRequest request, byte[] data, IEnumerable asyncIterator, TaskCompletionSource> tcs)
+ {
+ var enumerator = asyncIterator.GetEnumerator();
+ Action recursiveBody = null;
+ recursiveBody = completedTask =>
+ {
+ if (completedTask != null && completedTask.IsFaulted)
+ {
+ //none of the individual steps in _AsyncSteps run in parallel for 1 request
+ //as this would be impossible we can assume Aggregate Exception.InnerException
+ var exception = completedTask.Exception.InnerException;
+
+ //cleanly exit from exceptions in stages if the exception is a webexception
+ if (exception is WebException)
+ {
+ var path = request.RequestUri.ToString();
+ var method = request.Method;
+ var response = this.HandleWebException(data, exception as WebException, method, path);
+ tcs.SetResult(response);
+ }
+ else
+ tcs.TrySetException(exception);
+ enumerator.Dispose();
+ }
+ else if (enumerator.MoveNext())
+ {
+ enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ else enumerator.Dispose();
+ };
+ recursiveBody(null);
+ }
+
+ private int GetRequestTimeout(IRequestConfiguration requestConfiguration)
+ {
+ if (requestConfiguration != null && requestConfiguration.ConnectTimeout.HasValue)
+ return requestConfiguration.RequestTimeout.Value;
+
+ return this.ConnectionSettings.Timeout;
+ }
+ }
+}
diff --git a/src/Elasticsearch.Net/Connection/InMemoryConnection.cs b/src/Elasticsearch.Net/Connection/InMemoryConnection.cs
index f35f4f29c74..759d8ac3370 100644
--- a/src/Elasticsearch.Net/Connection/InMemoryConnection.cs
+++ b/src/Elasticsearch.Net/Connection/InMemoryConnection.cs
@@ -11,6 +11,8 @@ namespace Elasticsearch.Net.Connection
public class InMemoryConnection : HttpConnection
{
private byte[] _fixedResultBytes = Encoding.UTF8.GetBytes("{ \"USING NEST IN MEMORY CONNECTION\" : null }");
+ private int _statusCode;
+
public InMemoryConnection()
: base(new ConnectionConfiguration())
{
@@ -19,13 +21,14 @@ public InMemoryConnection()
public InMemoryConnection(IConnectionConfigurationValues settings)
: base(settings)
{
-
+ _statusCode = 200;
}
- public InMemoryConnection(IConnectionConfigurationValues settings, string fixedResult)
+ public InMemoryConnection(IConnectionConfigurationValues settings, string fixedResult, int statusCode = 200)
: this(settings)
{
_fixedResultBytes = Encoding.UTF8.GetBytes(fixedResult);
+ _statusCode = statusCode;
}
protected override ElasticsearchResponse DoSynchronousRequest(HttpWebRequest request, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
@@ -38,7 +41,7 @@ private ElasticsearchResponse ReturnConnectionStatus(HttpWebRequest requ
var method = request.Method;
var path = request.RequestUri.ToString();
- var cs = ElasticsearchResponse.Create(this.ConnectionSettings, 200, method, path, data);
+ var cs = ElasticsearchResponse.Create(this.ConnectionSettings, _statusCode, method, path, data);
cs.Response = new MemoryStream(_fixedResultBytes);
if (this.ConnectionSettings.ConnectionStatusHandler != null)
this.ConnectionSettings.ConnectionStatusHandler(cs);
diff --git a/src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs b/src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs
index efafec2319f..df949982be4 100644
--- a/src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs
+++ b/src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs
@@ -15,6 +15,7 @@ internal interface ITransportRequestState
int? Seed { get; set; }
Uri CurrentNode { get; set; }
List RequestMetrics { get; set; }
+ List SeenExceptions { get; }
Func ResponseCreationOverride { get; set; }
}
}
\ No newline at end of file
diff --git a/src/Elasticsearch.Net/Connection/RequestState/TransportRequestState.cs b/src/Elasticsearch.Net/Connection/RequestState/TransportRequestState.cs
index 09ef860c3bc..2c62c87c728 100644
--- a/src/Elasticsearch.Net/Connection/RequestState/TransportRequestState.cs
+++ b/src/Elasticsearch.Net/Connection/RequestState/TransportRequestState.cs
@@ -46,6 +46,7 @@ public IRequestConfiguration RequestConfiguration
public int Sniffs { get; set; }
public List SeenNodes { get; private set; }
+ public List SeenExceptions { get; private set; }
public List RequestMetrics { get; set; }
public Uri CurrentNode
@@ -76,6 +77,7 @@ public TransportRequestState(
{
this.StartedOn = DateTime.UtcNow;
this.SeenNodes = new List();
+ this.SeenExceptions = new List();
this.ClientSettings = settings;
this.RequestParameters = requestParameters;
this._traceEnabled = settings.TraceEnabled;
@@ -125,6 +127,12 @@ public Uri CreatePathOnCurrentNode(string path = null)
public void SetResult(ElasticsearchResponse result)
{
+ if (result == null)
+ {
+ if (!_traceEnabled) return;
+ this._stopwatch.Stop();
+ return;
+ }
result.NumberOfRetries = this.Retried;
if (this.ClientSettings.MetricsEnabled)
result.Metrics = new CallMetrics
diff --git a/src/Elasticsearch.Net/Connection/Transport.cs b/src/Elasticsearch.Net/Connection/Transport.cs
index ec4ee253396..449d748dd60 100644
--- a/src/Elasticsearch.Net/Connection/Transport.cs
+++ b/src/Elasticsearch.Net/Connection/Transport.cs
@@ -6,6 +6,7 @@
using System.Net;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
+using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net.Connection.Configuration;
@@ -67,17 +68,28 @@ private bool Ping(ITransportRequestState requestState)
ConnectTimeout = pingTimeout,
RequestTimeout = pingTimeout
};
+ try
+ {
+ ElasticsearchResponse response;
+ using (var rq = requestState.InitiateRequest(RequestType.Ping))
+ {
+ response = this.Connection.HeadSync(requestState.CreatePathOnCurrentNode(""), requestOverrides);
+ rq.Finish(response.Success, response.HttpStatusCode);
+ }
+ if (!response.HttpStatusCode.HasValue || response.HttpStatusCode.Value == -1)
+ throw new Exception("ping returned no status code");
+ if (response.Response == null)
+ return response.Success;
+
- ElasticsearchResponse response;
- using (var rq = requestState.InitiateRequest(RequestType.Ping))
+ using (response.Response)
+ return response.Success;
+
+ }
+ catch (Exception e)
{
- response = this.Connection.HeadSync(requestState.CreatePathOnCurrentNode(""), requestOverrides);
- rq.Finish(response.Success, response.HttpStatusCode);
+ throw new PingException(requestState.CurrentNode, e);
}
- if (response.Response == null)
- return response.Success;
- using (response.Response)
- return response.Success;
}
private Task PingAsync(ITransportRequestState requestState)
@@ -89,15 +101,34 @@ private Task PingAsync(ITransportRequestState requestState)
RequestTimeout = pingTimeout
};
var rq = requestState.InitiateRequest(RequestType.Ping);
- return this.Connection.Head(requestState.CreatePathOnCurrentNode(""), requestOverrides)
- .ContinueWith(t =>
- {
- rq.Finish(t.Result.Success, t.Result.HttpStatusCode);
- rq.Dispose();
- var response = t.Result;
- using (response.Response)
- return response.Success;
- });
+ try
+ {
+ return this.Connection.Head(requestState.CreatePathOnCurrentNode(""), requestOverrides)
+ .ContinueWith(t =>
+ {
+ if (t.IsFaulted)
+ {
+ rq.Finish(false, null);
+ rq.Dispose();
+ throw new PingException(requestState.CurrentNode, t.Exception);
+ }
+ rq.Finish(t.Result.Success, t.Result.HttpStatusCode);
+ rq.Dispose();
+ var response = t.Result;
+ if (!response.HttpStatusCode.HasValue || response.HttpStatusCode.Value == -1)
+ throw new PingException(requestState.CurrentNode, t.Exception);
+
+ using (response.Response)
+ return response.Success;
+ });
+ }
+ catch (Exception e)
+ {
+ var tcs = new TaskCompletionSource();
+ var pingException = new PingException(requestState.CurrentNode, e);
+ tcs.SetException(pingException);
+ return tcs.Task;
+ }
}
private IList Sniff(ITransportRequestState ownerState = null)
@@ -111,30 +142,37 @@ private IList Sniff(ITransportRequestState ownerState = null)
};
var requestParameters = new RequestParameters { RequestConfiguration = requestOverrides };
-
- var path = "_nodes/_all/clear?timeout=" + pingTimeout;
- ElasticsearchResponse response;
- using (var requestState = new TransportRequestState(this.Settings, requestParameters, "GET", path))
+ try
{
- response = this.DoRequest(requestState);
- //inform the owing request state of the requests the sniffs did.
- if (requestState.RequestMetrics != null && ownerState != null)
+ var path = "_nodes/_all/clear?timeout=" + pingTimeout;
+ ElasticsearchResponse response;
+ using (var requestState = new TransportRequestState(this.Settings, requestParameters, "GET", path))
{
- foreach (var r in requestState.RequestMetrics.Where(p => p.RequestType == RequestType.ElasticsearchCall))
- r.RequestType = RequestType.Sniff;
+ response = this.DoRequest(requestState);
+ //inform the owing request state of the requests the sniffs did.
+ if (requestState.RequestMetrics != null && ownerState != null)
+ {
+ foreach (var r in requestState.RequestMetrics.Where(p => p.RequestType == RequestType.ElasticsearchCall))
+ r.RequestType = RequestType.Sniff;
- if (ownerState.RequestMetrics == null) ownerState.RequestMetrics = new List();
- ownerState.RequestMetrics.AddRange(requestState.RequestMetrics);
- }
- if (response.Response == null) return null;
- using (response.Response)
- {
- return Sniffer.FromStream(response, response.Response, this.Serializer);
+ if (ownerState.RequestMetrics == null) ownerState.RequestMetrics = new List();
+ ownerState.RequestMetrics.AddRange(requestState.RequestMetrics);
+ }
+ if (response.Response == null) return null;
+
+ using (response.Response)
+ {
+ return Sniffer.FromStream(response, response.Response, this.Serializer);
+ }
}
}
+ catch (MaxRetryException e)
+ {
+ throw new MaxRetryException(new SniffException(e));
+ }
}
private void SniffClusterState(ITransportRequestState requestState = null)
@@ -312,16 +350,21 @@ private ElasticsearchResponse DoRequest(TransportRequestState requestSt
return typedResponse;
}
}
+ catch (MaxRetryException)
+ {
+ throw;
+ }
catch (ElasticsearchServerException)
{
throw;
}
catch (Exception e)
{
+ requestState.SeenExceptions.Add(e);
if (maxRetries == 0 && retried == 0)
throw;
seenError = true;
- return RetryRequest(requestState, e);
+ return RetryRequest(requestState);
}
finally
{
@@ -332,16 +375,15 @@ private ElasticsearchResponse DoRequest(TransportRequestState requestSt
return RetryRequest(requestState);
}
- private ElasticsearchResponse RetryRequest(TransportRequestState requestState, Exception e = null)
+ private ElasticsearchResponse RetryRequest(TransportRequestState requestState)
{
var maxRetries = this.GetMaximumRetries(requestState.RequestConfiguration);
- var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, e);
this._connectionPool.MarkDead(requestState.CurrentNode, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
SniffOnConnectionFailure(requestState);
- if (requestState.Retried >= maxRetries) throw new MaxRetryException(exceptionMessage, e);
+ ThrowMaxRetryExceptionWhenNeeded(requestState, maxRetries);
return this.DoRequest(requestState);
}
@@ -380,16 +422,19 @@ public Task> DoRequestAsync(string method, string pa
{
var tcs = new TaskCompletionSource>();
if (t.Exception != null)
+ {
tcs.SetException(t.Exception.Flatten());
+ requestState.SetResult(null);
+ }
else
{
tcs.SetResult(t.Result);
+ requestState.SetResult(t.Result);
}
- requestState.SetResult(t.Result);
-
return tcs.Task;
- }).Unwrap();
+ }).Unwrap()
+ ;
}
}
@@ -403,14 +448,15 @@ private Task> DoRequestAsync(TransportRequestState
{
+ if (t.IsFaulted)
+ {
+ requestState.SeenExceptions.Add(t.Exception.InnerException);
+ return this.RetryRequestAsync(requestState);
+ }
if (t.IsCompleted)
{
- if (!t.Result)
- return this.RetryRequestAsync(requestState, t.Exception);
return this.FinishOrRetryRequestAsync(requestState);
}
- if (t.IsFaulted)
- return this.RetryRequestAsync(requestState, t.Exception);
return null;
}).Unwrap();
}
@@ -432,7 +478,8 @@ private Task> FinishOrRetryRequestAsync(TransportReq
{
rq.Dispose();
if (maxRetries == 0 && retried == 0) throw t.Exception;
- return this.RetryRequestAsync(requestState, t.Exception);
+ requestState.SeenExceptions.Add(t.Exception);
+ return this.RetryRequestAsync(requestState);
}
if (t.Result.SuccessOrKnownError
@@ -462,21 +509,32 @@ private Task> FinishOrRetryRequestAsync(TransportReq
}).Unwrap();
}
- private Task> RetryRequestAsync(TransportRequestState requestState, Exception e = null)
+ private Task> RetryRequestAsync(TransportRequestState requestState)
{
var maxRetries = this.GetMaximumRetries(requestState.RequestConfiguration);
- var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, e);
this._connectionPool.MarkDead(requestState.CurrentNode, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
this.SniffOnConnectionFailure(requestState);
- if (requestState.Retried >= maxRetries)
- throw new MaxRetryException(exceptionMessage, e);
+ ThrowMaxRetryExceptionWhenNeeded(requestState, maxRetries);
return this.DoRequestAsync(requestState);
}
+ private static void ThrowMaxRetryExceptionWhenNeeded(TransportRequestState requestState, int maxRetries)
+ {
+ if (requestState.Retried < maxRetries) return;
+ var innerExceptions = requestState.SeenExceptions.Where(e => e != null).ToList();
+ var innerException = !innerExceptions.HasAny()
+ ? null
+ : (innerExceptions.Count() == 1)
+ ? innerExceptions.First()
+ : new AggregateException(requestState.SeenExceptions);
+ var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, innerException);
+ throw new MaxRetryException(exceptionMessage, innerException);
+ }
+
private static string CreateMaxRetryExceptionMessage(TransportRequestState requestState, Exception e)
{
string innerException = null;
@@ -490,10 +548,10 @@ private static string CreateMaxRetryExceptionMessage(TransportRequestState
var innerExceptions = aggregate.InnerExceptions
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
.ToList();
- innerException = string.Join("\r\n", innerExceptions);
+ innerException = "\r\n" + string.Join("\r\n", innerExceptions);
}
else
- innerException = MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
+ innerException = "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
}
var exceptionMessage = MaxRetryExceptionMessage
.F(requestState.Method, requestState.Path, requestState.Retried, innerException);
@@ -505,18 +563,29 @@ private Task> CallIntoConnectionAsync(Transport
var uri = requestState.CreatePathOnCurrentNode();
var postData = requestState.PostData;
var requestConfiguration = requestState.RequestConfiguration;
- switch (requestState.Method.ToLowerInvariant())
+ var method = requestState.Method.ToLowerInvariant();
+ try
{
- case "head": return this.Connection.Head(uri, requestConfiguration);
- case "get": return this.Connection.Get(uri, requestConfiguration);
- case "post": return this.Connection.Post(uri, postData, requestConfiguration);
- case "put": return this.Connection.Put(uri, postData, requestConfiguration);
- case "delete":
- return postData == null || postData.Length == 0
- ? this.Connection.Delete(uri, requestConfiguration)
- : this.Connection.Delete(uri, postData, requestConfiguration);
+ switch (method)
+ {
+ case "head": return this.Connection.Head(uri, requestConfiguration);
+ case "get": return this.Connection.Get(uri, requestConfiguration);
+ case "post": return this.Connection.Post(uri, postData, requestConfiguration);
+ case "put": return this.Connection.Put(uri, postData, requestConfiguration);
+ case "delete":
+ return postData == null || postData.Length == 0
+ ? this.Connection.Delete(uri, requestConfiguration)
+ : this.Connection.Delete(uri, postData, requestConfiguration);
+ default:
+ throw new Exception("Unknown HTTP method " + requestState.Method);
+ }
+ }
+ catch (Exception e)
+ {
+ var tcs = new TaskCompletionSource>();
+ tcs.SetException(e);
+ return tcs.Task;
}
- throw new Exception("Unknown HTTP method " + requestState.Method);
}
private Task Iterate(IEnumerable asyncIterator, MemoryStream memoryStream)
@@ -555,6 +624,10 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse(
if (IsValidResponse(requestState, streamResponse))
return null;
+ if (((streamResponse.HttpStatusCode.HasValue && streamResponse.HttpStatusCode.Value <= 0)
+ || !streamResponse.HttpStatusCode.HasValue) && streamResponse.OriginalException != null)
+ throw streamResponse.OriginalException;
+
ElasticsearchServerError error = null;
var type = typeof(T);
@@ -577,8 +650,12 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse(
var e = this.Serializer.Deserialize(ms);
error = ElasticsearchServerError.Create(e);
}
- catch { }
+ catch (Exception e)
+ {
+ var raw = ms.ToArray().Utf8String();
+ }
ms.Position = 0;
+ streamResponse.Response.Close();
streamResponse.Response = ms;
}
else
@@ -593,9 +670,12 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse(
private static bool IsValidResponse(ITransportRequestState requestState, IElasticsearchResponse streamResponse)
{
- return (streamResponse.Success || requestState.RequestConfiguration != null) &&
- (streamResponse.Success || requestState.RequestConfiguration == null ||
- requestState.RequestConfiguration.AllowedStatusCodes.Any(i => i == streamResponse.HttpStatusCode));
+ return streamResponse.Success ||
+ (!streamResponse.Success
+ && requestState.RequestConfiguration != null
+ && requestState.RequestConfiguration.AllowedStatusCodes.HasAny(i => i == streamResponse.HttpStatusCode)
+ );
+
}
private bool TypeOfResponseCopiesDirectly()
diff --git a/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs b/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs
index a962877152b..3bb04ac2889 100644
--- a/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs
+++ b/src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs
@@ -14,6 +14,7 @@ public class StaticConnectionPool : IConnectionPool
protected IDictionary UriLookup;
protected IList NodeUris;
protected int Current = -1;
+ private Random _random;
public int MaxRetries { get { return NodeUris.Count - 1; } }
@@ -24,6 +25,7 @@ public StaticConnectionPool(
bool randomizeOnStartup = true,
IDateTimeProvider dateTimeProvider = null)
{
+ _random = new Random(1337);
_dateTimeProvider = dateTimeProvider ?? new DateTimeProvider();
var rnd = new Random();
uris.ThrowIfEmpty("uris");
diff --git a/src/Elasticsearch.Net/Elasticsearch.Net.csproj b/src/Elasticsearch.Net/Elasticsearch.Net.csproj
index 55f2d78f6fe..5ba1c33fda7 100644
--- a/src/Elasticsearch.Net/Elasticsearch.Net.csproj
+++ b/src/Elasticsearch.Net/Elasticsearch.Net.csproj
@@ -128,4 +128,4 @@
-->
-
\ No newline at end of file
+
diff --git a/src/Elasticsearch.Net/Exceptions/MaxRetryException.cs b/src/Elasticsearch.Net/Exceptions/MaxRetryException.cs
index 8f76b7d3f0d..e37dee4cda1 100644
--- a/src/Elasticsearch.Net/Exceptions/MaxRetryException.cs
+++ b/src/Elasticsearch.Net/Exceptions/MaxRetryException.cs
@@ -12,15 +12,35 @@ namespace Elasticsearch.Net.Exceptions
public class MaxRetryException : Exception
{
- public MaxRetryException(string message) : base(message)
+ public MaxRetryException(string message) : base(message) { }
+
+ public MaxRetryException(string message, Exception innerException) : base(message, innerException) { }
+ public MaxRetryException(Exception innerException) : base(innerException.Message, innerException) { }
+ }
+
+ ///
+ /// Thrown when a sniff operation itself caused a maxrety exception
+ ///
+ public class SniffException : Exception
+ {
+
+ public SniffException(MaxRetryException innerException)
+ : base("Sniffing known nodes in the cluster caused a maxretry exception of its own", innerException)
{
+
}
+ }
+
+ ///
+ /// Thrown when a ping operation itself caused a maxrety exception
+ ///
+ public class PingException : Exception
+ {
- public MaxRetryException(string message, Exception innerException) : base(message, innerException)
+ public PingException(Uri baseURi, Exception innerException)
+ : base("Pinging {0} caused an exception".F(baseURi.ToString()), innerException)
{
}
}
-
-
}
diff --git a/src/Elasticsearch.userprefs b/src/Elasticsearch.userprefs
new file mode 100644
index 00000000000..e398ba610fe
--- /dev/null
+++ b/src/Elasticsearch.userprefs
@@ -0,0 +1,23 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Nest/ElasticClient.cs b/src/Nest/ElasticClient.cs
index 87b567d9732..9b1ab34077d 100644
--- a/src/Nest/ElasticClient.cs
+++ b/src/Nest/ElasticClient.cs
@@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.Connection;
+using Elasticsearch.Net.Exceptions;
namespace Nest
{
@@ -134,7 +135,21 @@ D descriptor
{
var pathInfo = descriptor.ToPathInfo(this._connectionSettings);
return dispatch(pathInfo, descriptor)
- .ContinueWith(r => ResultsSelector(r.Result, descriptor));
+ .ContinueWith(r =>
+ {
+ if (r.IsFaulted && r.Exception != null)
+ {
+ var mr = r.Exception.InnerException as MaxRetryException;
+ if (mr != null)
+ throw mr;
+
+ var ae = r.Exception.Flatten();
+ if (ae.InnerException != null)
+ throw ae.InnerException;
+ throw ae;
+ }
+ return ResultsSelector(r.Result, descriptor);
+ });
}
diff --git a/src/Nest/Nest.csproj b/src/Nest/Nest.csproj
index dd16d4d946e..a93bf7ee31a 100644
--- a/src/Nest/Nest.csproj
+++ b/src/Nest/Nest.csproj
@@ -885,7 +885,7 @@
-
+
@@ -896,4 +896,4 @@
-->
-
\ No newline at end of file
+
diff --git a/src/Tests/Elasticsearch.Net.Integration.Yaml/scroll/11_clear.yaml.cs b/src/Tests/Elasticsearch.Net.Integration.Yaml/scroll/11_clear.yaml.cs
index cc53dc011dd..850ccccc2dc 100644
--- a/src/Tests/Elasticsearch.Net.Integration.Yaml/scroll/11_clear.yaml.cs
+++ b/src/Tests/Elasticsearch.Net.Integration.Yaml/scroll/11_clear.yaml.cs
@@ -1,59 +1,59 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using NUnit.Framework;
-
-
-namespace Elasticsearch.Net.Integration.Yaml.Scroll2
-{
- public partial class Scroll2YamlTests
- {
-
-
- [NCrunch.Framework.ExclusivelyUses("ElasticsearchYamlTests")]
- public class ClearScroll1Tests : YamlTestsBase
- {
- [Test]
- public void ClearScroll1Test()
- {
-
- //do indices.create
- this.Do(()=> _client.IndicesCreate("test_scroll", null));
-
- //do index
- _body = new {
- foo= "bar"
- };
- this.Do(()=> _client.Index("test_scroll", "test", "42", _body));
-
- //do indices.refresh
- this.Do(()=> _client.IndicesRefreshForAll());
-
- //do search
- _body = new {
- query= new {
- match_all= new {}
- }
- };
- this.Do(()=> _client.Search("test_scroll", _body, nv=>nv
- .AddQueryString("search_type", @"scan")
- .AddQueryString("scroll", @"1m")
- ));
-
- //set scroll_id1 = _response._scroll_id;
- var scroll_id1 = _response._scroll_id;
-
- //do clear_scroll
- this.Do(()=> _client.ClearScroll((string)scroll_id1, null));
-
- //do scroll
- this.Do(()=> _client.ScrollGet((string)scroll_id1), shouldCatch: @"missing");
-
- //do clear_scroll
- this.Do(()=> _client.ClearScroll((string)scroll_id1, null), shouldCatch: @"missing");
-
- }
- }
- }
-}
-
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using NUnit.Framework;
+
+
+namespace Elasticsearch.Net.Integration.Yaml.Scroll2
+{
+ public partial class Scroll2YamlTests
+ {
+
+
+ [NCrunch.Framework.ExclusivelyUses("ElasticsearchYamlTests")]
+ public class ClearScroll1Tests : YamlTestsBase
+ {
+ [Test]
+ public void ClearScroll1Test()
+ {
+
+ //do indices.create
+ this.Do(()=> _client.IndicesCreate("test_scroll", null));
+
+ //do index
+ _body = new {
+ foo= "bar"
+ };
+ this.Do(()=> _client.Index("test_scroll", "test", "42", _body));
+
+ //do indices.refresh
+ this.Do(()=> _client.IndicesRefreshForAll());
+
+ //do search
+ _body = new {
+ query= new {
+ match_all= new {}
+ }
+ };
+ this.Do(()=> _client.Search("test_scroll", _body, nv=>nv
+ .AddQueryString("search_type", @"scan")
+ .AddQueryString("scroll", @"1m")
+ ));
+
+ //set scroll_id1 = _response._scroll_id;
+ var scroll_id1 = _response._scroll_id;
+
+ //do clear_scroll
+ this.Do(()=> _client.ClearScroll((string)scroll_id1, new {}));
+
+ //do scroll
+ this.Do(()=> _client.ScrollGet((string)scroll_id1), shouldCatch: @"missing");
+
+ //do clear_scroll
+ this.Do(()=> _client.ClearScroll((string)scroll_id1, new {}), shouldCatch: @"missing");
+
+ }
+ }
+ }
+}
+
diff --git a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/RetryTests.cs b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/RetryTests.cs
index 0c30aa386ca..f0036c3e328 100644
--- a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/RetryTests.cs
+++ b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/RetryTests.cs
@@ -22,7 +22,7 @@ public class RetryTests
.MaximumRetries(_retries);
[Test]
- public void ThrowsOutOfNodesException_AndRetriesTheSpecifiedTimes()
+ public void ThrowsMaxRetryException_AndRetriesTheSpecifiedTimes()
{
using (var fake = new AutoFake(callsDoNothing: true))
{
@@ -41,9 +41,30 @@ public void ThrowsOutOfNodesException_AndRetriesTheSpecifiedTimes()
}
}
+
+ [Test]
+ public void ThrowsMaxRetryException_AndRetriesTheSpecifiedTimes_HardIConnectionException_Async()
+ {
+ using (var fake = new AutoFake(callsDoNothing: true))
+ {
+ fake.Provide(_connectionConfig);
+ FakeCalls.ProvideDefaultTransport(fake);
+ var getCall = FakeCalls.GetCall(fake);
+
+ //return a started task that throws
+ getCall.Throws();
+
+ var client = fake.Resolve();
+
+ client.Settings.MaxRetries.Should().Be(_retries);
+
+ Assert.Throws(async () => await client.InfoAsync());
+ getCall.MustHaveHappened(Repeated.Exactly.Times(_retries + 1));
+ }
+ }
[Test]
- public void ThrowsOutOfNodesException_AndRetriesTheSpecifiedTimes_Async()
+ public void ThrowsMaxRetryException_AndRetriesTheSpecifiedTimes_Async()
{
using (var fake = new AutoFake(callsDoNothing: true))
{
@@ -179,6 +200,6 @@ public void ShouldRetryOn503_Async()
getCall.MustHaveHappened(Repeated.Exactly.Times(_retries + 1));
}
}
-
+
}
}
diff --git a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs
index 53815d64ad4..7d5d610ffc7 100644
--- a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs
+++ b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs
@@ -335,5 +335,103 @@ public void HostsReturnedBySniffAreVisited()
seenNodes[6].Port.Should().Be(9201);
}
}
+
+ [Test]
+ public void ShouldRetryOnSniffConnectionException_Async()
+ {
+ using (var fake = new AutoFake(callsDoNothing: true))
+ {
+ var uris = new[]
+ {
+ new Uri("http://localhost:9200"),
+ new Uri("http://localhost:9201"),
+ new Uri("http://localhost:9202")
+ };
+ var connectionPool = new SniffingConnectionPool(uris, randomizeOnStartup: false);
+ var config = new ConnectionConfiguration(connectionPool)
+ .SniffOnConnectionFault();
+
+ fake.Provide(config);
+
+ var pingAsyncCall = FakeCalls.PingAtConnectionLevelAsync(fake);
+ pingAsyncCall.Returns(FakeResponse.OkAsync(config));
+
+ //sniffing is always synchronous and in turn will issue synchronous pings
+ var pingCall = FakeCalls.PingAtConnectionLevel(fake);
+ pingCall.Returns(FakeResponse.Ok(config));
+
+ var sniffCall = FakeCalls.Sniff(fake);
+ var seenPorts = new List();
+ sniffCall.ReturnsLazily((Uri u, IRequestConfiguration c) =>
+ {
+ seenPorts.Add(u.Port);
+ throw new Exception("Something bad happened");
+ });
+
+ var getCall = FakeCalls.GetCall(fake);
+ getCall.Returns(FakeResponse.BadAsync(config));
+
+ FakeCalls.ProvideDefaultTransport(fake);
+
+ var client = fake.Resolve();
+
+ var e = Assert.Throws(async () => await client.NodesHotThreadsAsync("nodex"));
+
+ //all nodes must be tried to sniff for more information
+ sniffCall.MustHaveHappened(Repeated.Exactly.Times(uris.Count()));
+ //make sure we only saw one call to hot threads (the one that failed initially)
+ getCall.MustHaveHappened(Repeated.Exactly.Once);
+
+ //make sure the sniffs actually happened on all the individual nodes
+ seenPorts.ShouldAllBeEquivalentTo(uris.Select(u=>u.Port));
+ e.InnerException.Message.Should().Contain("Sniffing known nodes");
+ }
+ }
+
+ [Test]
+ public void ShouldRetryOnSniffConnectionException()
+ {
+ using (var fake = new AutoFake(callsDoNothing: true))
+ {
+ var uris = new[]
+ {
+ new Uri("http://localhost:9200"),
+ new Uri("http://localhost:9201")
+ };
+ var connectionPool = new SniffingConnectionPool(uris, randomizeOnStartup: false);
+ var config = new ConnectionConfiguration(connectionPool)
+ .SniffOnConnectionFault();
+
+ fake.Provide(config);
+ FakeCalls.ProvideDefaultTransport(fake);
+
+ var pingCall = FakeCalls.PingAtConnectionLevel(fake);
+ pingCall.Returns(FakeResponse.Ok(config));
+
+ var sniffCall = FakeCalls.Sniff(fake);
+ var seenPorts = new List();
+ sniffCall.ReturnsLazily((Uri u, IRequestConfiguration c) =>
+ {
+ seenPorts.Add(u.Port);
+ throw new Exception("Something bad happened");
+ });
+
+ var getCall = FakeCalls.GetSyncCall(fake);
+ getCall.Returns(FakeResponse.Bad(config));
+
+ var client = fake.Resolve();
+
+ var e = Assert.Throws(() => client.Info());
+ sniffCall.MustHaveHappened(Repeated.Exactly.Times(uris.Count()));
+ getCall.MustHaveHappened(Repeated.Exactly.Once);
+
+ //make sure that if a ping throws an exception it wont
+ //keep retrying to ping the same node but failover to the next
+ seenPorts.ShouldAllBeEquivalentTo(uris.Select(u=>u.Port));
+
+ var sniffException = e.InnerException as SniffException;
+ sniffException.Should().NotBeNull();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs
index 6468e48ee8d..849516ba7c8 100644
--- a/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs
+++ b/src/Tests/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs
@@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
using Autofac;
using Autofac.Extras.FakeItEasy;
using Elasticsearch.Net.Connection;
@@ -372,5 +374,79 @@ public void IfAllButOneConnectionDiesSubsequentRequestsMustUseTheOneAliveConnect
markLastAlive.MustHaveHappened(Repeated.Exactly.Times(4));
}
}
+
+ [Test]
+ public void ShouldRetryOnPingConnectionException_Async()
+ {
+ using (var fake = new AutoFake(callsDoNothing: true))
+ {
+ var connectionPool = new StaticConnectionPool(_uris, randomizeOnStartup: false);
+ var config = new ConnectionConfiguration(connectionPool);
+
+ fake.Provide(config);
+ FakeCalls.ProvideDefaultTransport(fake);
+
+ var pingCall = FakeCalls.PingAtConnectionLevelAsync(fake);
+ var seenPorts = new List();
+ pingCall.ReturnsLazily((Uri u, IRequestConfiguration c) =>
+ {
+ seenPorts.Add(u.Port);
+ throw new Exception("Something bad happened");
+ });
+
+ var getCall = FakeCalls.GetCall(fake);
+ getCall.Returns(FakeResponse.OkAsync(config));
+
+ var client = fake.Resolve();
+
+ var e = Assert.Throws(async () => await client.InfoAsync());
+ pingCall.MustHaveHappened(Repeated.Exactly.Times(_retries + 1));
+ getCall.MustNotHaveHappened();
+
+ //make sure that if a ping throws an exception it wont
+ //keep retrying to ping the same node but failover to the next
+ seenPorts.ShouldAllBeEquivalentTo(_uris.Select(u=>u.Port));
+ var ae = e.InnerException as AggregateException;
+ ae = ae.Flatten();
+ ae.InnerException.Message.Should().Contain("Pinging");
+ }
+ }
+
+ [Test]
+ public void ShouldRetryOnPingConnectionException()
+ {
+ using (var fake = new AutoFake(callsDoNothing: true))
+ {
+ var connectionPool = new StaticConnectionPool(_uris, randomizeOnStartup: false);
+ var config = new ConnectionConfiguration(connectionPool);
+
+ fake.Provide(config);
+ FakeCalls.ProvideDefaultTransport(fake);
+
+ var pingCall = FakeCalls.PingAtConnectionLevel(fake);
+ var seenPorts = new List();
+ pingCall.ReturnsLazily((Uri u, IRequestConfiguration c) =>
+ {
+ seenPorts.Add(u.Port);
+ throw new Exception("Something bad happened");
+ });
+
+ var getCall = FakeCalls.GetSyncCall(fake);
+ getCall.Returns(FakeResponse.Ok(config));
+
+ var client = fake.Resolve();
+
+ var e = Assert.Throws(() => client.Info());
+ pingCall.MustHaveHappened(Repeated.Exactly.Times(_retries + 1));
+ getCall.MustNotHaveHappened();
+
+ //make sure that if a ping throws an exception it wont
+ //keep retrying to ping the same node but failover to the next
+ seenPorts.ShouldAllBeEquivalentTo(_uris.Select(u=>u.Port));
+ var aggregateException = e.InnerException as AggregateException;
+ aggregateException.Should().NotBeNull();
+ aggregateException.InnerExceptions.Should().Contain(ex => ex.GetType().Name == "PingException");
+ }
+ }
}
}
diff --git a/src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeCalls.cs b/src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeCalls.cs
index 71a60c6df92..56e41135618 100644
--- a/src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeCalls.cs
+++ b/src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeCalls.cs
@@ -74,23 +74,6 @@ private static Expression> IsRoot()
{
return u=>u.AbsolutePath == "/" || u.AbsolutePath == "";
}
- public static IReturnValueConfiguration>> SniffAsync(
- AutoFake fake,
- IConnectionConfigurationValues configurationValues = null,
- IList nodes = null
- )
- {
- var sniffCall = A.CallTo(() => fake.Resolve().Get(
- A.That.Matches(IsSniffUrl()),
- A._));
- if (nodes == null) return sniffCall;
- var stream = SniffResponse(nodes);
- var response = FakeResponse.Ok(configurationValues, "GET", "/_nodes/_all/clear", stream);
- sniffCall.Returns(Task.FromResult(response));
- return sniffCall;
- }
-
-
public static ITransport ProvideDefaultTransport(AutoFake fake, IDateTimeProvider dateTimeProvider = null)
{
diff --git a/src/Tests/Nest.Tests.Integration/ConnectionTests.cs b/src/Tests/Nest.Tests.Integration/ConnectionTests.cs
index 2d3d5734ff5..e76b11bebe3 100644
--- a/src/Tests/Nest.Tests.Integration/ConnectionTests.cs
+++ b/src/Tests/Nest.Tests.Integration/ConnectionTests.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Net;
using Elasticsearch.Net;
using NUnit.Framework;
@@ -24,7 +25,7 @@ public void TestConnectSuccess()
Assert.True(rootNodeInfo.ConnectionStatus.Success);
}
[Test]
- public void construct_client_with_null_or_empy_settings()
+ public void Construct_Client_With_NullOrEmpty_Settings()
{
Assert.Throws(() =>
{
@@ -36,7 +37,7 @@ public void construct_client_with_null_or_empy_settings()
});
}
[Test]
- public void construct_client_with_invalid_hostname()
+ public void Construct_Client_With_Invalid_Hostname()
{
Assert.Throws(() =>
{
@@ -45,13 +46,13 @@ public void construct_client_with_invalid_hostname()
}
[Test]
- public void connect_to_unknown_hostname()
+ public void Connect_To_Unknown_Hostname()
{
IRootInfoResponse result = null;
//this test will fail if fiddler is enabled since the proxy
//will report a statuscode of 502 instead of -1
- Assert.DoesNotThrow(() =>
+ Assert.Throws(() =>
{
var settings = new ConnectionSettings(new Uri("http://youdontownthis.domain.do.you"), "index");
var client = new ElasticClient(settings);
diff --git a/src/Tests/Nest.Tests.Integration/ElasticsearchConfiguration.cs b/src/Tests/Nest.Tests.Integration/ElasticsearchConfiguration.cs
index fde5553f2e7..71fb9bb9fd0 100644
--- a/src/Tests/Nest.Tests.Integration/ElasticsearchConfiguration.cs
+++ b/src/Tests/Nest.Tests.Integration/ElasticsearchConfiguration.cs
@@ -29,7 +29,7 @@ public static Version CurrentVersion
public static Uri CreateBaseUri(int? port = null)
{
var host = Host;
- if (port == null && Process.GetProcessesByName("fiddler").HasAny())
+ if (port != 9500 && Process.GetProcessesByName("fiddler").HasAny())
host = "ipv4.fiddler";
var uri = new UriBuilder("http", host, port.GetValueOrDefault(9200)).Uri;
diff --git a/src/Tests/Nest.Tests.Integration/Exceptions/ElasticsearchExceptionTests.cs b/src/Tests/Nest.Tests.Integration/Exceptions/ElasticsearchExceptionTests.cs
new file mode 100644
index 00000000000..85ba60e69e6
--- /dev/null
+++ b/src/Tests/Nest.Tests.Integration/Exceptions/ElasticsearchExceptionTests.cs
@@ -0,0 +1,216 @@
+using System;
+using System.Net;
+using System.Runtime.Remoting.Channels;
+using Elasticsearch.Net;
+using Elasticsearch.Net.ConnectionPool;
+using Elasticsearch.Net.Exceptions;
+using FluentAssertions;
+using Nest.Tests.MockData.Domain;
+using NUnit.Framework;
+
+namespace Nest.Tests.Integration.Exceptions
+{
+ [TestFixture]
+ public class ElasticsearchExceptionTests
+ {
+ [Test]
+ public void ConnectionException_WithClientThatDoesNotThrow_StillThrows()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri(9492);
+ var client = new ElasticClient(new ConnectionSettings(uri).SetTimeout(500));
+ var e = Assert.Throws(() => client.RootNodeInfo());
+ }
+
+ [Test]
+ public void ConnectionException_WithThrowingClient()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri(9494);
+ var client = new ElasticClient(new ConnectionSettings(uri)
+ .SetTimeout(500)
+ .ThrowOnElasticsearchServerExceptions());
+ var e = Assert.Throws(() => client.RootNodeInfo());
+ }
+
+ [Test]
+ public void ConnectionException_WithClientThatDoesNotThrow_StillThrows_Async()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri(9492);
+ var client = new ElasticClient(new ConnectionSettings(uri).SetTimeout(500));
+ Assert.Throws(async () => await client.RootNodeInfoAsync());
+ }
+
+ [Test]
+ public void ConnectionException_WithThrowingClient_Async()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri(9494);
+ var client = new ElasticClient(new ConnectionSettings(uri)
+ .SetTimeout(500)
+ .ThrowOnElasticsearchServerExceptions());
+ Assert.Throws(async () => await client.RootNodeInfoAsync());
+ }
+
+ [Test]
+ public async void ServerError_Is_Set_ClientThat_DoesNotThow_AndDoesNotExposeRawResponse_Async()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri();
+ var client = new ElasticClient(new ConnectionSettings(uri).ExposeRawResponse(false));
+ Assert.DoesNotThrow(async () =>
+ {
+ var result = await client.SearchAsync(s => s.QueryRaw(@"{ ""badjson"": {} }"));
+ result.IsValid.Should().BeFalse();
+ result.ConnectionStatus.HttpStatusCode.Should().Be(400);
+ var e = result.ServerError;
+ e.Should().NotBeNull();
+ e.ExceptionType.Should().Contain("SearchPhaseExecutionException");
+ });
+ }
+
+ [Test]
+ public void ServerError_Is_Set_ClientThat_DoesNotThow_AndDoesNotExposeRawResponse()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri();
+ var client = new ElasticClient(new ConnectionSettings(uri).ExposeRawResponse(false));
+ Assert.DoesNotThrow(() =>
+ {
+ var result = client.Search(s => s.QueryRaw(@"{ ""badjson"": {} }"));
+ result.IsValid.Should().BeFalse();
+ result.ConnectionStatus.HttpStatusCode.Should().Be(400);
+ var e = result.ServerError;
+ e.Should().NotBeNull();
+ e.ExceptionType.Should().Contain("SearchPhaseExecutionException");
+ });
+ }
+
+ [Test]
+ public void ServerError_Is_Set_ClientThat_DoesNotThow()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri();
+ var client = new ElasticClient(new ConnectionSettings(uri).ExposeRawResponse(true));
+ Assert.DoesNotThrow(() =>
+ {
+ var result = client.Search(s => s.QueryRaw(@"{ ""badjson"": {} }"));
+ result.IsValid.Should().BeFalse();
+ result.ConnectionStatus.HttpStatusCode.Should().Be(400);
+ var e = result.ServerError;
+ e.Should().NotBeNull();
+ e.ExceptionType.Should().Contain("SearchPhaseExecutionException");
+ });
+ }
+
+ [Test]
+ public void WebException_WithThrowingClient_ThrowsMappedException()
+ {
+ var uri = ElasticsearchConfiguration.CreateBaseUri();
+ var client = new ElasticClient(new ConnectionSettings(uri).ThrowOnElasticsearchServerExceptions());
+ var e = Assert.Throws(() => client.Search(s => s.QueryRaw(@"{ ""badjson"" : {} }")));
+ e.ExceptionType.Should().Contain("SearchPhaseExecutionException");
+ }
+
+
+ [Test]
+ public void ConnectionPool_DoesNotThrowOnServerExceptions_ThrowsMaxRetryException_OnDeadNodes()
+ {
+ var uris = new []
+ {
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9203),
+ };
+ var connectionPool = new StaticConnectionPool(uris);
+ var client = new ElasticClient(new ConnectionSettings(connectionPool)
+ .SetTimeout(1000)
+ );
+ var e = Assert.Throws(() =>
+ {
+ var result = client.Search(s => s.MatchAll());
+ result.IsValid.Should().BeFalse();
+ });
+ e.Should().NotBeNull();
+ }
+
+ [Test]
+ public async void ConnectionPool_DoesNotThrowOnServerExceptions_ThrowsMaxRetryException_OnDeadNodes_Async()
+ {
+ var uris = new []
+ {
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9203),
+ };
+ var connectionPool = new StaticConnectionPool(uris);
+ var client = new ElasticClient(new ConnectionSettings(connectionPool)
+ .SetTimeout(1000)
+ );
+
+ try
+ {
+ var result = await client.SearchAsync(s => s.MatchAll());
+ result.IsValid.Should().BeFalse();
+ }
+ catch (MaxRetryException e)
+ {
+ Assert.Pass("MaxRetryException caught");
+ }
+ catch (Exception e)
+ {
+ Assert.Fail("Did not expect exception of type {0} to be caught", e.GetType().Name);
+ }
+ }
+
+ [Test]
+ public void ConnectionPool_ThrowOnServerExceptions_ThrowsElasticsearchServerException()
+ {
+ var uris = new []
+ {
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ };
+ var connectionPool = new StaticConnectionPool(uris);
+ var client = new ElasticClient(new ConnectionSettings(connectionPool)
+ .ThrowOnElasticsearchServerExceptions()
+ .SetTimeout(1000)
+ );
+ var e = Assert.Throws(() =>
+ {
+ var index = ElasticsearchConfiguration.NewUniqueIndexName();
+ var create = client.CreateIndex(index);
+ var close = client.CloseIndex(index);
+ var result = client.Search(s => s.Index(index));
+ });
+ e.Should().NotBeNull();
+ }
+
+ [Test]
+ public async void ConnectionPool_ThrowOnServerExceptions_ThrowsElasticsearchServerException_Async()
+ {
+ var uris = new []
+ {
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
+ };
+ var connectionPool = new StaticConnectionPool(uris);
+ var client = new ElasticClient(new ConnectionSettings(connectionPool)
+ .ThrowOnElasticsearchServerExceptions()
+ .SetTimeout(1000)
+ );
+ try
+ {
+
+ var index = ElasticsearchConfiguration.NewUniqueIndexName();
+ var create = await client.CreateIndexAsync(i=>i.Index(index));
+ var close = await client.CloseIndexAsync(i=>i.Index(index));
+ var result = await client.SearchAsync(s => s.Index(index));
+ }
+ catch (ElasticsearchServerException e)
+ {
+ Assert.Pass("ElasticearchServerException caught");
+ }
+ catch (Exception e)
+ {
+ Assert.Fail("Did not expect exception of type {0} to be caught", e.GetType().Name);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Tests/Nest.Tests.Integration/Exceptions/ThrowAlwaysConnection.cs b/src/Tests/Nest.Tests.Integration/Exceptions/ThrowAlwaysConnection.cs
new file mode 100644
index 00000000000..604b13f5947
--- /dev/null
+++ b/src/Tests/Nest.Tests.Integration/Exceptions/ThrowAlwaysConnection.cs
@@ -0,0 +1,72 @@
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using Elasticsearch.Net;
+using Elasticsearch.Net.Connection;
+using Elasticsearch.Net.Connection.Configuration;
+
+namespace Nest.Tests.Integration.Exceptions
+{
+ public class ThrowAlwaysConnection : IConnection
+ {
+ public Task> Get(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse GetSync(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task> Head(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse HeadSync(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task> Post(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse PostSync(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task> Put(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse PutSync(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task> Delete(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse DeleteSync(Uri uri, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task> Delete(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ElasticsearchResponse DeleteSync(Uri uri, byte[] data, IRequestConfiguration requestConfiguration = null)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Tests/Nest.Tests.Integration/Failover/FailoverPingTests.cs b/src/Tests/Nest.Tests.Integration/Failover/FailoverPingTests.cs
index 483355af88c..8d773ff2de8 100644
--- a/src/Tests/Nest.Tests.Integration/Failover/FailoverPingTests.cs
+++ b/src/Tests/Nest.Tests.Integration/Failover/FailoverPingTests.cs
@@ -19,9 +19,9 @@ public void FailoverShouldOnlyPingDeadNodes()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200)
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool);
@@ -79,9 +79,9 @@ public async void FailoverShouldOnlyPingDeadNodes_Async()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200)
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool);
diff --git a/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnFaultTests.cs b/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnFaultTests.cs
index 5effb9ac899..cb51f2e2823 100644
--- a/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnFaultTests.cs
+++ b/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnFaultTests.cs
@@ -1,6 +1,7 @@
using System;
using Elasticsearch.Net;
using Elasticsearch.Net.ConnectionPool;
+using Elasticsearch.Net.Exceptions;
using FluentAssertions;
using NUnit.Framework;
@@ -19,9 +20,9 @@ public void SniffOnFaultShouldGetCleanClusterState()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool)
@@ -49,9 +50,9 @@ public void SniffOnFaultShouldGetCleanClusterState()
{
rootNode = client.RootNodeInfo();
metrics = rootNode.ConnectionStatus.Metrics;
- metrics.Requests.Count.Should().Be(1);
metrics.Requests[0].Node.Port.Should().Be(9200);
metrics.Requests[0].RequestType.Should().Be(RequestType.ElasticsearchCall);
+ metrics.Requests.Count.Should().Be(1);
}
}
@@ -61,9 +62,9 @@ public async void SniffOnFaultShouldGetCleanClusterState_Async()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200),
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool)
diff --git a/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnStartupTests.cs b/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnStartupTests.cs
index 0f0d6a91194..0b81d092719 100644
--- a/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnStartupTests.cs
+++ b/src/Tests/Nest.Tests.Integration/Failover/FailoverSniffOnStartupTests.cs
@@ -19,9 +19,9 @@ public void SniffOnStartShouldOnlyHit9200_WithoutPing()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200)
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool)
@@ -52,9 +52,9 @@ public async void SniffOnStartShouldOnlyHit9200_WithoutPing_Async()
{
var seeds = new[]
{
- new Uri("http://localhost:9202"),
- new Uri("http://localhost:9201"),
- new Uri("http://localhost:9200"),
+ ElasticsearchConfiguration.CreateBaseUri(9202),
+ ElasticsearchConfiguration.CreateBaseUri(9201),
+ ElasticsearchConfiguration.CreateBaseUri(9200)
};
var sniffingConnectionPool = new SniffingConnectionPool(seeds, randomizeOnStartup: false);
var connectionSettings = new ConnectionSettings(sniffingConnectionPool)
diff --git a/src/Tests/Nest.Tests.Integration/IntegrationSetup.cs b/src/Tests/Nest.Tests.Integration/IntegrationSetup.cs
index e4d16d36fe0..8fcc8e7dbd4 100644
--- a/src/Tests/Nest.Tests.Integration/IntegrationSetup.cs
+++ b/src/Tests/Nest.Tests.Integration/IntegrationSetup.cs
@@ -22,47 +22,56 @@ public static void Setup()
var people = NestTestData.People;
var boolTerms = NestTestData.BoolTerms;
- var createIndexResult = client.CreateIndex(ElasticsearchConfiguration.DefaultIndex, c => c
- .NumberOfReplicas(0)
- .NumberOfShards(1)
- .AddMapping(m => m
- .MapFromAttributes()
- .Properties(p => p
- .String(s => s.Name(ep => ep.Content).TermVector(TermVectorOption.WithPositionsOffsetsPayloads))))
- .AddMapping(m => m.MapFromAttributes())
- .AddMapping(m => m.Properties(pp=>pp
- .String(sm => sm.Name(p => p.Name1).Index(FieldIndexOption.NotAnalyzed))
- .String(sm => sm.Name(p => p.Name2).Index(FieldIndexOption.NotAnalyzed))
- ))
- );
+ try
+ {
+ var createIndexResult = client.CreateIndex(ElasticsearchConfiguration.DefaultIndex, c => c
+ .NumberOfReplicas(0)
+ .NumberOfShards(1)
+ .AddMapping(m => m
+ .MapFromAttributes()
+ .Properties(p => p
+ .String(s => s.Name(ep => ep.Content).TermVector(TermVectorOption.WithPositionsOffsetsPayloads))))
+ .AddMapping(m => m.MapFromAttributes())
+ .AddMapping(m => m.Properties(pp => pp
+ .String(sm => sm.Name(p => p.Name1).Index(FieldIndexOption.NotAnalyzed))
+ .String(sm => sm.Name(p => p.Name2).Index(FieldIndexOption.NotAnalyzed))
+ ))
+ );
- var createAntotherIndexResult = client.CreateIndex(ElasticsearchConfiguration.DefaultIndex + "_clone", c => c
- .NumberOfReplicas(0)
- .NumberOfShards(1)
- .AddMapping(m => m
- .MapFromAttributes()
- .Properties(p => p
- .String(s => s.Name(ep => ep.Content).TermVector(TermVectorOption.WithPositionsOffsetsPayloads))))
- .AddMapping(m => m.MapFromAttributes())
- .AddMapping(m => m.Properties(pp => pp
- .String(sm => sm.Name(p => p.Name1).Index(FieldIndexOption.NotAnalyzed))
- .String(sm => sm.Name(p => p.Name2).Index(FieldIndexOption.NotAnalyzed))
- ))
- );
+ var createAntotherIndexResult = client.CreateIndex(ElasticsearchConfiguration.DefaultIndex + "_clone", c => c
+ .NumberOfReplicas(0)
+ .NumberOfShards(1)
+ .AddMapping(m => m
+ .MapFromAttributes()
+ .Properties(p => p
+ .String(s => s.Name(ep => ep.Content).TermVector(TermVectorOption.WithPositionsOffsetsPayloads))))
+ .AddMapping(m => m.MapFromAttributes())
+ .AddMapping(m => m.Properties(pp => pp
+ .String(sm => sm.Name(p => p.Name1).Index(FieldIndexOption.NotAnalyzed))
+ .String(sm => sm.Name(p => p.Name2).Index(FieldIndexOption.NotAnalyzed))
+ ))
+ );
+
+ var bulkResponse = client.Bulk(b => b
+ .IndexMany(projects)
+ .IndexMany(people)
+ .IndexMany(boolTerms)
+ .Refresh()
+ );
+ }
+ catch (Exception e)
+ {
+
+ throw;
+ }
- var bulkResponse = client.Bulk(b=>b
- .IndexMany(projects)
- .IndexMany(people)
- .IndexMany(boolTerms)
- .Refresh()
- );
}
[TearDown]
public static void TearDown()
{
- var client = ElasticsearchConfiguration.Client.Value;
- client.DeleteIndex(di => di.Indices(ElasticsearchConfiguration.DefaultIndex, ElasticsearchConfiguration.DefaultIndex + "_*"));
+ var client = ElasticsearchConfiguration.Client.Value;
+ client.DeleteIndex(di => di.Indices(ElasticsearchConfiguration.DefaultIndex, ElasticsearchConfiguration.DefaultIndexPrefix + "*"));
}
}
}
diff --git a/src/Tests/Nest.Tests.Integration/Nest.Tests.Integration.csproj b/src/Tests/Nest.Tests.Integration/Nest.Tests.Integration.csproj
index f2589a14ce2..b9fde2c4918 100644
--- a/src/Tests/Nest.Tests.Integration/Nest.Tests.Integration.csproj
+++ b/src/Tests/Nest.Tests.Integration/Nest.Tests.Integration.csproj
@@ -115,6 +115,8 @@
+
+
diff --git a/src/Tests/Nest.Tests.Integration/Startup/StartupTests.cs b/src/Tests/Nest.Tests.Integration/Startup/StartupTests.cs
index 493be165884..c33e4817eb9 100644
--- a/src/Tests/Nest.Tests.Integration/Startup/StartupTests.cs
+++ b/src/Tests/Nest.Tests.Integration/Startup/StartupTests.cs
@@ -24,6 +24,7 @@ public class StartupTests : IntegrationTests
};
[Test]
+ [Ignore]
public void Calling_RootNodeInfo_TwiceInOneAssembly_IsFast()
{
using (var context = AppDomainContext.Create(_setupInfo))
@@ -43,6 +44,7 @@ public void Calling_RootNodeInfo_TwiceInOneAssembly_IsFast()
}
[Test]
+ [Ignore]
public void Calling_RootNodeInfo_OnceInTwoAsemblies_SlowTwice()
{
using (var context = AppDomainContext.Create(_setupInfo))
@@ -66,6 +68,7 @@ public void Calling_RootNodeInfo_OnceInTwoAsemblies_SlowTwice()
[Test]
+ [Ignore]
public void Calling_Search_TwiceInAppDomainIsFast()
{
using (var context = AppDomainContext.Create(_setupInfo))
@@ -85,6 +88,7 @@ public void Calling_Search_TwiceInAppDomainIsFast()
}
[Test]
+ [Ignore]
public void Calling_Complex_TypedSearch_IsAlsoFaster_AfterBeingCalledOnce()
{
using (var context = AppDomainContext.Create(_setupInfo))
diff --git a/src/Tests/Nest.Tests.Unit/BaseJsonTests.cs b/src/Tests/Nest.Tests.Unit/BaseJsonTests.cs
index 34241418da1..dc68caf6dbf 100644
--- a/src/Tests/Nest.Tests.Unit/BaseJsonTests.cs
+++ b/src/Tests/Nest.Tests.Unit/BaseJsonTests.cs
@@ -1,127 +1,133 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text.RegularExpressions;
-using Elasticsearch.Net;
-using Elasticsearch.Net.Connection;
-using NUnit.Framework;
-using Newtonsoft.Json;
-using System.Reflection;
-using System.IO;
-
-namespace Nest.Tests.Unit
-{
- public static class UnitTestDefaults
- {
- public static readonly Uri Uri = new Uri("http://localhost:9200");
- public static readonly string DefaultIndex = "nest_test_data";
- }
-
- public class BaseJsonTests
- {
- protected readonly IConnectionSettingsValues _settings;
- ///
- /// In memory client that never hits elasticsearch
- ///
- protected IElasticClient _client;
- protected readonly IConnection _connection;
-
- public BaseJsonTests()
- {
- this._settings = new ConnectionSettings(UnitTestDefaults.Uri, UnitTestDefaults.DefaultIndex)
- .ExposeRawResponse();
- this._connection = new InMemoryConnection(this._settings);
- this._client = new ElasticClient(this._settings, this._connection);
- }
-
- protected void deb(string s)
- {
- //I use NCrunch for continuous testing
- //with this i can print variables as i type...
- //Lazy programmers for the win!
- throw new Exception(s);
- }
- protected ElasticClient GetFixedReturnClient(MethodBase methodInfo, string fileName)
- {
- var settings = new ConnectionSettings(UnitTestDefaults.Uri, UnitTestDefaults.DefaultIndex)
- .ExposeRawResponse();
- var file = this.GetFileFromMethod(methodInfo, fileName);
- var jsonResponse = File.ReadAllText(file);
- var connection = new InMemoryConnection(this._settings, jsonResponse);
- var client = new ElasticClient(settings, connection);
- return client;
- }
- protected void JsonEquals(object o, MethodBase method, string fileName = null)
- {
- if (o is byte[])
- {
- string s = ((byte[])o).Utf8String();
- this.JsonEquals(s, method, fileName);
- return;
- }
- var json = TestElasticClient.Serialize(o);
- this.JsonEquals(json, method, fileName);
- }
-
- protected void JsonEquals(string json, MethodBase method, string fileName = null)
- {
- var file = this.GetFileFromMethod(method, fileName);
-
- var expected = File.ReadAllText(file);
- //Assert.AreEqual(expected, json);
- Assert.True(json.JsonEquals(expected), this.PrettyPrint(json));
- }
-
- protected void JsonNotEquals(object o, MethodBase method, string fileName = null)
- {
- var json = TestElasticClient.Serialize(o);
- this.JsonNotEquals(json, method, fileName);
- }
-
- protected string ReadMethodJson(MethodBase method, string fileName = null)
- {
- var file = this.GetFileFromMethod(method, fileName);
- var expected = File.ReadAllText(file);
- return expected;
- }
-
- protected void JsonNotEquals(string json, MethodBase method, string fileName = null)
- {
- var file = this.GetFileFromMethod(method, fileName);
- var expected = File.ReadAllText(file);
- Assert.False(json.JsonEquals(expected), this.PrettyPrint(json));
- }
-
- protected void BulkJsonEquals(string json, MethodBase method, string fileName = null)
- {
- var file = this.GetFileFromMethod(method, fileName);
- var expected = File.ReadAllText(file);
- var expectedJsonLines = Regex.Split(expected, @"\r?\n", RegexOptions.None).Where(s => !s.IsNullOrEmpty());
- var jsonLines = Regex.Split(json, @"\r?\n", RegexOptions.None).Where(s => !s.IsNullOrEmpty());
- Assert.IsNotEmpty(expectedJsonLines, json);
- Assert.IsNotEmpty(jsonLines, json);
- Assert.AreEqual(jsonLines.Count(), expectedJsonLines.Count(), json);
- foreach (var line in expectedJsonLines.Zip(jsonLines, (e, j) => new { Expected = e, Json = j }))
- {
- Assert.True(line.Json.JsonEquals(line.Expected), line.Json);
- }
- }
-
- private string PrettyPrint(string json)
- {
- dynamic parsedJson = JsonConvert.DeserializeObject(json);
- return JsonConvert.SerializeObject(parsedJson, Formatting.Indented);
- }
-
- protected string GetFileFromMethod(MethodBase method, string fileName)
- {
- var type = method.DeclaringType;
- var @namespace = method.DeclaringType.Namespace;
- var folderSep = Path.DirectorySeparatorChar.ToString();
- var folder = @namespace.Replace("Nest.Tests.Unit.", "").Replace(".", folderSep);
- var file = Path.Combine(folder, (fileName ?? method.Name) + ".json");
- file = Path.Combine(Environment.CurrentDirectory.Replace("bin" + folderSep + "Debug", "").Replace("bin" + folderSep + "Release", ""), file);
- return file;
- }
- }
-}
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.RegularExpressions;
+using Elasticsearch.Net;
+using Elasticsearch.Net.Connection;
+using NUnit.Framework;
+using Newtonsoft.Json;
+using System.Reflection;
+using System.IO;
+
+namespace Nest.Tests.Unit
+{
+ public static class UnitTestDefaults
+ {
+ public static readonly Uri Uri = new Uri("http://localhost:9200");
+ public static readonly string DefaultIndex = "nest_test_data";
+ }
+
+ public class BaseJsonTests
+ {
+ protected readonly IConnectionSettingsValues _settings;
+ ///
+ /// In memory client that never hits elasticsearch
+ ///
+ protected IElasticClient _client;
+ protected readonly IConnection _connection;
+
+ public BaseJsonTests()
+ {
+ this._settings = new ConnectionSettings(UnitTestDefaults.Uri, UnitTestDefaults.DefaultIndex)
+ .ExposeRawResponse();
+ this._connection = new InMemoryConnection(this._settings);
+ this._client = new ElasticClient(this._settings, this._connection);
+ }
+
+ protected void deb(string s)
+ {
+ //I use NCrunch for continuous testing
+ //with this i can print variables as i type...
+ //Lazy programmers for the win!
+ throw new Exception(s);
+ }
+ protected ElasticClient GetFixedReturnClient(
+ MethodBase methodInfo,
+ string fileName = null,
+ int statusCode = 200,
+ Func alterSettings = null
+ )
+ {
+ Func alter = alterSettings ?? (s => s);
+ var settings = alter(new ConnectionSettings(UnitTestDefaults.Uri, UnitTestDefaults.DefaultIndex)
+ .ExposeRawResponse());
+ var file = this.GetFileFromMethod(methodInfo, fileName);
+ var jsonResponse = File.ReadAllText(file);
+ var connection = new InMemoryConnection(settings, jsonResponse, statusCode);
+ var client = new ElasticClient(settings, connection);
+ return client;
+ }
+ protected void JsonEquals(object o, MethodBase method, string fileName = null)
+ {
+ if (o is byte[])
+ {
+ string s = ((byte[])o).Utf8String();
+ this.JsonEquals(s, method, fileName);
+ return;
+ }
+ var json = TestElasticClient.Serialize(o);
+ this.JsonEquals(json, method, fileName);
+ }
+
+ protected void JsonEquals(string json, MethodBase method, string fileName = null)
+ {
+ var file = this.GetFileFromMethod(method, fileName);
+
+ var expected = File.ReadAllText(file);
+ //Assert.AreEqual(expected, json);
+ Assert.True(json.JsonEquals(expected), this.PrettyPrint(json));
+ }
+
+ protected void JsonNotEquals(object o, MethodBase method, string fileName = null)
+ {
+ var json = TestElasticClient.Serialize(o);
+ this.JsonNotEquals(json, method, fileName);
+ }
+
+ protected string ReadMethodJson(MethodBase method, string fileName = null)
+ {
+ var file = this.GetFileFromMethod(method, fileName);
+ var expected = File.ReadAllText(file);
+ return expected;
+ }
+
+ protected void JsonNotEquals(string json, MethodBase method, string fileName = null)
+ {
+ var file = this.GetFileFromMethod(method, fileName);
+ var expected = File.ReadAllText(file);
+ Assert.False(json.JsonEquals(expected), this.PrettyPrint(json));
+ }
+
+ protected void BulkJsonEquals(string json, MethodBase method, string fileName = null)
+ {
+ var file = this.GetFileFromMethod(method, fileName);
+ var expected = File.ReadAllText(file);
+ var expectedJsonLines = Regex.Split(expected, @"\r?\n", RegexOptions.None).Where(s => !s.IsNullOrEmpty());
+ var jsonLines = Regex.Split(json, @"\r?\n", RegexOptions.None).Where(s => !s.IsNullOrEmpty());
+ Assert.IsNotEmpty(expectedJsonLines, json);
+ Assert.IsNotEmpty(jsonLines, json);
+ Assert.AreEqual(jsonLines.Count(), expectedJsonLines.Count(), json);
+ foreach (var line in expectedJsonLines.Zip(jsonLines, (e, j) => new { Expected = e, Json = j }))
+ {
+ Assert.True(line.Json.JsonEquals(line.Expected), line.Json);
+ }
+ }
+
+ private string PrettyPrint(string json)
+ {
+ dynamic parsedJson = JsonConvert.DeserializeObject(json);
+ return JsonConvert.SerializeObject(parsedJson, Formatting.Indented);
+ }
+
+ protected string GetFileFromMethod(MethodBase method, string fileName)
+ {
+ var type = method.DeclaringType;
+ var @namespace = method.DeclaringType.Namespace;
+ var folderSep = Path.DirectorySeparatorChar.ToString();
+ var folder = @namespace.Replace("Nest.Tests.Unit.", "").Replace(".", folderSep);
+ var file = Path.Combine(folder, (fileName ?? method.Name).Replace(@"\", folderSep) + ".json");
+ file = Path.Combine(Environment.CurrentDirectory.Replace("bin" + folderSep + "Debug", "").Replace("bin" + folderSep + "Release", ""), file);
+ return file;
+ }
+ }
+}
diff --git a/src/Tests/Nest.Tests.Unit/BigBadUrlUnitTests.cs b/src/Tests/Nest.Tests.Unit/BigBadUrlUnitTests.cs
index f40a700dc75..ff0804d49b6 100644
--- a/src/Tests/Nest.Tests.Unit/BigBadUrlUnitTests.cs
+++ b/src/Tests/Nest.Tests.Unit/BigBadUrlUnitTests.cs
@@ -1,166 +1,170 @@
-using System;
-using System.Linq;
-using Elasticsearch.Net;
-using Elasticsearch.Net.Connection;
-using FluentAssertions;
-using Nest.Tests.MockData.Domain;
-using NUnit.Framework;
-
-namespace Nest.Tests.Unit.Cluster
-{
- [TestFixture]
- public class BigBadUrlUnitTests
- {
- private void Do(string method, string expectedUrl, Action call)
- {
- var settings = new ConnectionSettings(new Uri("http://localhost:9200/"), "mydefaultindex")
- .SetConnectionStatusHandler(c =>
- {
- new Uri(c.RequestUrl).PathAndQuery.Should().Be(expectedUrl);
- c.RequestMethod.Should().Be(method);
- })
- ;
- var client = new ElasticClient(settings, new InMemoryConnection(settings));
- call(client);
- }
-
- private class Doc
- {
- public string Id { get; set; }
- public string Name { get; set; }
- }
-
- private class OtherDoc
- {
- public string Name { get; set; }
- }
-
-
- [Test]
- public void TestAllTheUrls()
- {
- Do("POST", "/_aliases", c => c.Alias(a => a));
- Do("POST", "/_analyze", c => c.Analyze(a => a.Text("blah")));
- Do("POST", "/myindex/_analyze", c => c.Analyze(a => a.Index("myindex").Text("blah")));
- Do("POST", "/myindex/_bulk", c => c.Bulk(b => b.FixedPath("myindex").Index(ib => ib.Document(new Doc { Id = "1" }))));
- Do("POST", "/myindex/mytype/_bulk", c => c.Bulk(b => b.FixedPath("myindex", "mytype").Index(ib => ib.Document(new Doc { Id = "1" }))));
- Do("POST", "/myindex/_bulk", c => c.Bulk(b => b.FixedPath("myindex").Index(ib => ib.Document(new Doc { Id = "1" }))));
- Do("POST", "/_bulk", c => c.Bulk(b => b.Index(ib => ib.Document(new Doc { Id = "1" }))));
- Do("POST", "/_cache/clear", c => c.ClearCache());
- Do("POST", "/mydefaultindex/_cache/clear", c => c.ClearCache(cc => cc.Index()));
- Do("POST", "/mydefaultindex/_close", c => c.CloseIndex(ci => ci.Index()));
- Do("GET", "/_nodes", c => c.NodesInfo());
- Do("GET", "/_nodes/insert-marvel-character", c => c.NodesInfo(cn => cn.NodeId("insert-marvel-character")));
- Do("GET", "/_nodes/stats", c => c.NodesStats());
- Do("GET", "/_nodes/insert-marvel-character/stats/jvm", c => c
- .NodesStats(cn => cn.NodeId("insert-marvel-character").Metrics(NodesStatsMetric.Jvm)));
- Do("GET", "/_cluster/state", c => c.ClusterState());
- Do("GET", "/_cluster/state?local=true", c => c.ClusterState(cs => cs.Local()));
- Do("POST", "/_count", c => c.Count());
- Do("POST", "/_all/doc/_count", c => c.Count(cc => cc.AllIndices().Type()));
- Do("POST", "/mydefaultindex/doc/_count", c => c.Count(cc => cc.Index().Type()));
- Do("POST", "/mydefaultindex/_count", c => c.Count(cc => cc.Index()));
- Do("POST", "/mydefaultindex/doc/_count", c => c.Count());
- Do("POST", "/customindex/doc/_count", c => c.Count(cc => cc.Index("customindex")));
- Do("POST", "/new-index-name", c => c.CreateIndex("new-index-name"));
- Do("DELETE", "/mydefaultindex/doc/1", c => c.Delete(d => d.Id("1")));
- Do("DELETE", "/mydefaultindex/doc/1", c => c.Delete(1));
- Do("DELETE", "/customindex/doc/1", c => c.Delete(d => d.Index("customindex").Id(1)));
- Do("DELETE", "/customindex/doc/1", c => c.Delete(1, d => d.Index("customindex")));
- Do("DELETE", "/mydefaultindex/doc/_query", c => c.DeleteByQuery(q => q.MatchAll()));
- Do("DELETE", "/customindex/doc/_query", c => c.DeleteByQuery(q => q.Index("customindex").MatchAll()));
- Do("DELETE", "/_all/doc/_query", c => c.DeleteByQuery(q => q.AllIndices().MatchAll()));
- Do("DELETE", "/mydefaultindex/_query", c => c.DeleteByQuery(q => q.AllTypes().MatchAll()));
- Do("DELETE", "/custom-index/_query", c => c.DeleteByQuery(q => q.Index("custom-index").AllTypes().MatchAll()));
- Do("DELETE", "/mydefaultindex", c => c.DeleteIndex(i => i.Index()));
- Do("DELETE", "/a%2Cb", c => c.DeleteIndex(i => i.Indices("a", "b")));
- Do("POST", "/_bulk", c => c.DeleteMany(Enumerable.Range(0, 10).Select(i => new Doc { Id = i.ToString() })));
- Do("POST", "/customindex/customtype/_bulk", c => c.DeleteMany(Enumerable.Range(0, 10).Select(i => new Doc { Id = i.ToString() }), index: "customindex", type: "customtype"));
- Do("DELETE", "/mydefaultindex/doc/_mapping", c => c.DeleteMapping());
- Do("DELETE", "/_template/myTemplate", c => c.DeleteTemplate("myTemplate"));
- Do("DELETE", "/_all/_warmer/mywarmer", c => c.DeleteWarmer("mywarmer", w => w.AllIndices()));
- Do("DELETE", "/_all/_warmer/mywarmer", c => c.DeleteWarmer("mywarmer"));
- Do("DELETE", "/mycustomindex/_warmer/mywarmer", c => c.DeleteWarmer("mywarmer", w => w.Index("mycustomindex")));
- Do("POST", "/_all/_flush", c => c.Flush(f => f.AllIndices()));
- Do("POST", "/mycustomindex/_flush", c => c.Flush(f => f.Index("mycustomindex")));
- Do("GET", "/mydefaultindex/doc/1", c => c.Get(1));
- Do("GET", "/mycustomindex/mycustomtype/1", c => c.Get(1, index: "mycustomindex", type: "mycustomtype"));
- Do("GET", "/mycustomindex/mycustomtype/1", c => c.Get(g => g.Id(1).Index("mycustomindex").Type("mycustomtype")));
- Do("GET", "/mydefaultindex/_alias/*", c => c.GetAlias(a => a.Index()));
- Do("GET", "/_alias/prefix-*", c => c.GetAlias(a => a.Alias("prefix-*")));
- Do("GET", "/mydefaultindex/_aliases/*", c => c.GetAliases(a => a.Index()));
- Do("GET", "/_aliases/prefix-*", c => c.GetAliases(a => a.Alias("prefix-*")));
- Do("GET", "/mydefaultindex/_settings", c => c.GetIndexSettings(i => i.Index()));
- Do("GET", "/mydefaultindex/_mapping/doc", c => c.GetMapping());
- Do("GET", "/mycustomindex/_mapping/doc", c => c.GetMapping(m => m.Index("mycustomindex")));
- Do("GET", "/mycustomindex/_mapping/sometype", c => c.GetMapping