Skip to content

Commit c86015a

Browse files
authored
Make sure we dispose CancellationTokenRegistration (#2776)
* When the passed cancellation token is shared for a longer time then a single request, e.g bulkall/scrollall/reindex it will leak Register() handles and memory will keep on growing * PostRequestAsync does not need to be generic * include GetResponse() in using and inlined it so we can more aggresively cancel on the waithandles in the case of exceptions too
1 parent 865362f commit c86015a

File tree

1 file changed

+57
-44
lines changed

1 file changed

+57
-44
lines changed

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
180180
}
181181

182182

183-
private static void RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
183+
private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
184184
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true);
185185

186186
private static void TimeoutCallback(object state, bool timedOut)
@@ -189,69 +189,82 @@ private static void TimeoutCallback(object state, bool timedOut)
189189
(state as WebRequest)?.Abort();
190190
}
191191

192-
public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData, CancellationToken cancellationToken) where TReturn : class
192+
public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData,
193+
CancellationToken cancellationToken) where TReturn : class
193194
{
194195
var builder = new ResponseBuilder<TReturn>(requestData, cancellationToken);
196+
WaitHandle apmWaitHandle = null;
197+
RegisteredWaitHandle apmTaskTimeout = null;
195198
try
196199
{
197-
var request = this.CreateHttpWebRequest(requestData);
198-
cancellationToken.Register(()=>request.Abort());
199200
var data = requestData.PostData;
200-
201-
if (data != null)
201+
var request = this.CreateHttpWebRequest(requestData);
202+
using (cancellationToken.Register(() => request.Abort()))
202203
{
203-
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
204-
RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
205-
206-
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
207-
{
208-
if (requestData.HttpCompression)
209-
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
210-
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
211-
else
212-
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
213-
}
204+
if (data != null)
205+
await PostRequestAsync(requestData, cancellationToken, request, data);
206+
requestData.MadeItToResponse = true;
207+
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
208+
//Either the stream or the response object needs to be closed but not both although it won't
209+
//throw any errors if both are closed atleast one of them has to be Closed.
210+
//Since we expose the stream we let closing the stream determining when to close the connection
211+
212+
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
213+
apmWaitHandle = ((IAsyncResult) apmGetResponseTask).AsyncWaitHandle;
214+
apmTaskTimeout = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
215+
216+
var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false));
217+
builder.StatusCode = (int) response.StatusCode;
218+
builder.Stream = response.GetResponseStream();
219+
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
220+
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
221+
// https://github.com/elastic/elasticsearch-net/issues/2311
222+
// if stream is null call dispose on response instead.
223+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
224+
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
214225
}
215-
requestData.MadeItToResponse = true;
216-
217-
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
218-
//Either the stream or the response object needs to be closed but not both although it won't
219-
//throw any errors if both are closed atleast one of them has to be Closed.
220-
//Since we expose the stream we let closing the stream determining when to close the connection
221-
222-
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
223-
RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
224-
225-
var response = (HttpWebResponse)(await apmGetResponseTask.ConfigureAwait(false));
226-
builder.StatusCode = (int)response.StatusCode;
227-
builder.Stream = response.GetResponseStream();
228-
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
229-
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
230-
// https://github.com/elastic/elasticsearch-net/issues/2311
231-
// if stream is null call dispose on response instead.
232-
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
233226
}
234227
catch (WebException e)
235228
{
229+
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
236230
HandleException(builder, e);
237231
}
238-
232+
catch
233+
{
234+
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
235+
throw;
236+
}
239237
return await builder.ToResponseAsync().ConfigureAwait(false);
240238
}
241239

240+
private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request,
241+
PostData<object> data)
242+
{
243+
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
244+
var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
245+
246+
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
247+
{
248+
if (requestData.HttpCompression)
249+
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
250+
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
251+
else
252+
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
253+
}
254+
getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle);
255+
}
256+
242257
private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebException exception)
243258
where TReturn : class
244259
{
245260
builder.Exception = exception;
246261
var response = exception.Response as HttpWebResponse;
247-
if (response != null)
248-
{
249-
builder.StatusCode = (int)response.StatusCode;
250-
builder.Stream = response.GetResponseStream();
251-
// https://github.com/elastic/elasticsearch-net/issues/2311
252-
// if stream is null call dispose on response instead.
253-
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
254-
}
262+
if (response == null) return;
263+
builder.StatusCode = (int)response.StatusCode;
264+
builder.Stream = response.GetResponseStream();
265+
// https://github.com/elastic/elasticsearch-net/issues/2311
266+
// if stream is null call dispose on response instead.
267+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
255268
}
256269

257270
void IDisposable.Dispose() => this.DisposeManagedResources();

0 commit comments

Comments
 (0)