From 0d346e848cc2f980a4a439bd94fedced1216360e Mon Sep 17 00:00:00 2001 From: Mpdreamz Date: Tue, 6 Jun 2017 14:40:46 -0700 Subject: [PATCH 1/3] When the passed cancellation token is shared for a longer time then a single request, e.g bulkall/scrollall/reindex it will leak Register() handles and memory will keep on growing --- .../Connection/HttpConnection.cs | 80 +++++++++++-------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index a1e78a7a43f..36fae1e5e85 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -180,7 +180,7 @@ public virtual ElasticsearchResponse Request(RequestData reque } - private static void RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) => + private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) => ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true); private static void TimeoutCallback(object state, bool timedOut) @@ -194,42 +194,15 @@ public virtual async Task> RequestAsync( var builder = new ResponseBuilder(requestData, cancellationToken); try { - var request = this.CreateHttpWebRequest(requestData); - cancellationToken.Register(()=>request.Abort()); var data = requestData.PostData; - - if (data != null) + var request = this.CreateHttpWebRequest(requestData); + using (cancellationToken.Register(() => request.Abort())) { - var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); - RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData); - - using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false)) - { - if (requestData.HttpCompression) - using (var zipStream = new GZipStream(stream, CompressionMode.Compress)) - await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); - else - await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); - } + if (data != null) + await PostRequestAsync(requestData, cancellationToken, request, data); + requestData.MadeItToResponse = true; } - requestData.MadeItToResponse = true; - - //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx - //Either the stream or the response object needs to be closed but not both although it won't - //throw any errors if both are closed atleast one of them has to be Closed. - //Since we expose the stream we let closing the stream determining when to close the connection - - var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); - RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); - - var response = (HttpWebResponse)(await apmGetResponseTask.ConfigureAwait(false)); - builder.StatusCode = (int)response.StatusCode; - builder.Stream = response.GetResponseStream(); - if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) - builder.DeprecationWarnings = response.Headers.GetValues("Warning"); - // https://github.com/elastic/elasticsearch-net/issues/2311 - // if stream is null call dispose on response instead. - if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); + await GetResponseAsync(requestData, request, builder); } catch (WebException e) { @@ -239,6 +212,45 @@ public virtual async Task> RequestAsync( return await builder.ToResponseAsync().ConfigureAwait(false); } + private static async Task GetResponseAsync(RequestData requestData, HttpWebRequest request, ResponseBuilder builder) + where TReturn : class + { + //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx + //Either the stream or the response object needs to be closed but not both although it won't + //throw any errors if both are closed atleast one of them has to be Closed. + //Since we expose the stream we let closing the stream determining when to close the connection + + var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); + var getResponseCancellationHandle = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); + + var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false)); + builder.StatusCode = (int) response.StatusCode; + builder.Stream = response.GetResponseStream(); + if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) + builder.DeprecationWarnings = response.Headers.GetValues("Warning"); + // https://github.com/elastic/elasticsearch-net/issues/2311 + // if stream is null call dispose on response instead. + if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); + getResponseCancellationHandle.Unregister(((IAsyncResult) apmGetResponseTask).AsyncWaitHandle); + } + + private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request, + PostData data) where TReturn : class + { + var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); + var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData); + + using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false)) + { + if (requestData.HttpCompression) + using (var zipStream = new GZipStream(stream, CompressionMode.Compress)) + await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); + else + await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); + } + getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle); + } + private void HandleException(ResponseBuilder builder, WebException exception) where TReturn : class { From 8d80683769784a266a1baa9f093ea634f0c380cd Mon Sep 17 00:00:00 2001 From: Mpdreamz Date: Tue, 6 Jun 2017 14:54:46 -0700 Subject: [PATCH 2/3] PostRequestAsync does not need to be generic --- src/Elasticsearch.Net/Connection/HttpConnection.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index 36fae1e5e85..7c1d59bc53c 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -199,7 +199,7 @@ public virtual async Task> RequestAsync( using (cancellationToken.Register(() => request.Abort())) { if (data != null) - await PostRequestAsync(requestData, cancellationToken, request, data); + await PostRequestAsync(requestData, cancellationToken, request, data); requestData.MadeItToResponse = true; } await GetResponseAsync(requestData, request, builder); @@ -234,8 +234,8 @@ private static async Task GetResponseAsync(RequestData requestData, Htt getResponseCancellationHandle.Unregister(((IAsyncResult) apmGetResponseTask).AsyncWaitHandle); } - private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request, - PostData data) where TReturn : class + private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request, + PostData data) { var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData); From fd2cc93aba355ec9a6f0f5bfae340edaa705b2df Mon Sep 17 00:00:00 2001 From: Mpdreamz Date: Tue, 20 Jun 2017 17:11:30 +0200 Subject: [PATCH 3/3] include GetResponse() in using and inlined it so we can more aggresively cancel on the waithandles in the case of exceptions too --- .../Connection/HttpConnection.cs | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index 7c1d59bc53c..0b08af9754d 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -189,9 +189,12 @@ private static void TimeoutCallback(object state, bool timedOut) (state as WebRequest)?.Abort(); } - public virtual async Task> RequestAsync(RequestData requestData, CancellationToken cancellationToken) where TReturn : class + public virtual async Task> RequestAsync(RequestData requestData, + CancellationToken cancellationToken) where TReturn : class { var builder = new ResponseBuilder(requestData, cancellationToken); + WaitHandle apmWaitHandle = null; + RegisteredWaitHandle apmTaskTimeout = null; try { var data = requestData.PostData; @@ -201,39 +204,39 @@ public virtual async Task> RequestAsync( if (data != null) await PostRequestAsync(requestData, cancellationToken, request, data); requestData.MadeItToResponse = true; + //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx + //Either the stream or the response object needs to be closed but not both although it won't + //throw any errors if both are closed atleast one of them has to be Closed. + //Since we expose the stream we let closing the stream determining when to close the connection + + var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); + apmWaitHandle = ((IAsyncResult) apmGetResponseTask).AsyncWaitHandle; + apmTaskTimeout = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); + + var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false)); + builder.StatusCode = (int) response.StatusCode; + builder.Stream = response.GetResponseStream(); + if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) + builder.DeprecationWarnings = response.Headers.GetValues("Warning"); + // https://github.com/elastic/elasticsearch-net/issues/2311 + // if stream is null call dispose on response instead. + if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); } - await GetResponseAsync(requestData, request, builder); } catch (WebException e) { + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); HandleException(builder, e); } - + catch + { + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); + throw; + } return await builder.ToResponseAsync().ConfigureAwait(false); } - private static async Task GetResponseAsync(RequestData requestData, HttpWebRequest request, ResponseBuilder builder) - where TReturn : class - { - //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx - //Either the stream or the response object needs to be closed but not both although it won't - //throw any errors if both are closed atleast one of them has to be Closed. - //Since we expose the stream we let closing the stream determining when to close the connection - - var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); - var getResponseCancellationHandle = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); - - var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false)); - builder.StatusCode = (int) response.StatusCode; - builder.Stream = response.GetResponseStream(); - if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) - builder.DeprecationWarnings = response.Headers.GetValues("Warning"); - // https://github.com/elastic/elasticsearch-net/issues/2311 - // if stream is null call dispose on response instead. - if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); - getResponseCancellationHandle.Unregister(((IAsyncResult) apmGetResponseTask).AsyncWaitHandle); - } - private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request, PostData data) { @@ -256,14 +259,12 @@ private void HandleException(ResponseBuilder builder, WebExcep { builder.Exception = exception; var response = exception.Response as HttpWebResponse; - if (response != null) - { - builder.StatusCode = (int)response.StatusCode; - builder.Stream = response.GetResponseStream(); - // https://github.com/elastic/elasticsearch-net/issues/2311 - // if stream is null call dispose on response instead. - if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); - } + if (response == null) return; + builder.StatusCode = (int)response.StatusCode; + builder.Stream = response.GetResponseStream(); + // https://github.com/elastic/elasticsearch-net/issues/2311 + // if stream is null call dispose on response instead. + if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); } void IDisposable.Dispose() => this.DisposeManagedResources();