diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index a1e78a7a43f..0b08af9754d 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -180,7 +180,7 @@ public virtual ElasticsearchResponse Request(RequestData reque } - private static void RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) => + private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) => ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true); private static void TimeoutCallback(object state, bool timedOut) @@ -189,69 +189,82 @@ private static void TimeoutCallback(object state, bool timedOut) (state as WebRequest)?.Abort(); } - public virtual async Task> RequestAsync(RequestData requestData, CancellationToken cancellationToken) where TReturn : class + public virtual async Task> RequestAsync(RequestData requestData, + CancellationToken cancellationToken) where TReturn : class { var builder = new ResponseBuilder(requestData, cancellationToken); + WaitHandle apmWaitHandle = null; + RegisteredWaitHandle apmTaskTimeout = null; try { - var request = this.CreateHttpWebRequest(requestData); - cancellationToken.Register(()=>request.Abort()); var data = requestData.PostData; - - if (data != null) + var request = this.CreateHttpWebRequest(requestData); + using (cancellationToken.Register(() => request.Abort())) { - var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); - RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData); - - using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false)) - { - if (requestData.HttpCompression) - using (var zipStream = new GZipStream(stream, CompressionMode.Compress)) - await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); - else - await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); - } + if (data != null) + await PostRequestAsync(requestData, cancellationToken, request, data); + requestData.MadeItToResponse = true; + //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx + //Either the stream or the response object needs to be closed but not both although it won't + //throw any errors if both are closed atleast one of them has to be Closed. + //Since we expose the stream we let closing the stream determining when to close the connection + + var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); + apmWaitHandle = ((IAsyncResult) apmGetResponseTask).AsyncWaitHandle; + apmTaskTimeout = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); + + var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false)); + builder.StatusCode = (int) response.StatusCode; + builder.Stream = response.GetResponseStream(); + if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) + builder.DeprecationWarnings = response.Headers.GetValues("Warning"); + // https://github.com/elastic/elasticsearch-net/issues/2311 + // if stream is null call dispose on response instead. + if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); } - requestData.MadeItToResponse = true; - - //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx - //Either the stream or the response object needs to be closed but not both although it won't - //throw any errors if both are closed atleast one of them has to be Closed. - //Since we expose the stream we let closing the stream determining when to close the connection - - var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); - RegisterApmTaskTimeout(apmGetResponseTask, request, requestData); - - var response = (HttpWebResponse)(await apmGetResponseTask.ConfigureAwait(false)); - builder.StatusCode = (int)response.StatusCode; - builder.Stream = response.GetResponseStream(); - if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning")) - builder.DeprecationWarnings = response.Headers.GetValues("Warning"); - // https://github.com/elastic/elasticsearch-net/issues/2311 - // if stream is null call dispose on response instead. - if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); } catch (WebException e) { + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); HandleException(builder, e); } - + catch + { + if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle); + throw; + } return await builder.ToResponseAsync().ConfigureAwait(false); } + private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request, + PostData data) + { + var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); + var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData); + + using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false)) + { + if (requestData.HttpCompression) + using (var zipStream = new GZipStream(stream, CompressionMode.Compress)) + await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); + else + await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false); + } + getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle); + } + private void HandleException(ResponseBuilder builder, WebException exception) where TReturn : class { builder.Exception = exception; var response = exception.Response as HttpWebResponse; - if (response != null) - { - builder.StatusCode = (int)response.StatusCode; - builder.Stream = response.GetResponseStream(); - // https://github.com/elastic/elasticsearch-net/issues/2311 - // if stream is null call dispose on response instead. - if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); - } + if (response == null) return; + builder.StatusCode = (int)response.StatusCode; + builder.Stream = response.GetResponseStream(); + // https://github.com/elastic/elasticsearch-net/issues/2311 + // if stream is null call dispose on response instead. + if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose(); } void IDisposable.Dispose() => this.DisposeManagedResources();