Skip to content

Commit 0d346e8

Browse files
committed
When the passed cancellation token is shared for a longer time then a single request, e.g bulkall/scrollall/reindex it will leak Register() handles and memory will keep on growing
1 parent 865362f commit 0d346e8

File tree

1 file changed

+46
-34
lines changed

1 file changed

+46
-34
lines changed

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
180180
}
181181

182182

183-
private static void RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
183+
private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
184184
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true);
185185

186186
private static void TimeoutCallback(object state, bool timedOut)
@@ -194,42 +194,15 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
194194
var builder = new ResponseBuilder<TReturn>(requestData, cancellationToken);
195195
try
196196
{
197-
var request = this.CreateHttpWebRequest(requestData);
198-
cancellationToken.Register(()=>request.Abort());
199197
var data = requestData.PostData;
200-
201-
if (data != null)
198+
var request = this.CreateHttpWebRequest(requestData);
199+
using (cancellationToken.Register(() => request.Abort()))
202200
{
203-
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
204-
RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
205-
206-
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
207-
{
208-
if (requestData.HttpCompression)
209-
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
210-
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
211-
else
212-
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
213-
}
201+
if (data != null)
202+
await PostRequestAsync<TReturn>(requestData, cancellationToken, request, data);
203+
requestData.MadeItToResponse = true;
214204
}
215-
requestData.MadeItToResponse = true;
216-
217-
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
218-
//Either the stream or the response object needs to be closed but not both although it won't
219-
//throw any errors if both are closed atleast one of them has to be Closed.
220-
//Since we expose the stream we let closing the stream determining when to close the connection
221-
222-
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
223-
RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
224-
225-
var response = (HttpWebResponse)(await apmGetResponseTask.ConfigureAwait(false));
226-
builder.StatusCode = (int)response.StatusCode;
227-
builder.Stream = response.GetResponseStream();
228-
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
229-
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
230-
// https://github.com/elastic/elasticsearch-net/issues/2311
231-
// if stream is null call dispose on response instead.
232-
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
205+
await GetResponseAsync(requestData, request, builder);
233206
}
234207
catch (WebException e)
235208
{
@@ -239,6 +212,45 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
239212
return await builder.ToResponseAsync().ConfigureAwait(false);
240213
}
241214

215+
private static async Task GetResponseAsync<TReturn>(RequestData requestData, HttpWebRequest request, ResponseBuilder<TReturn> builder)
216+
where TReturn : class
217+
{
218+
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
219+
//Either the stream or the response object needs to be closed but not both although it won't
220+
//throw any errors if both are closed atleast one of them has to be Closed.
221+
//Since we expose the stream we let closing the stream determining when to close the connection
222+
223+
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
224+
var getResponseCancellationHandle = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
225+
226+
var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false));
227+
builder.StatusCode = (int) response.StatusCode;
228+
builder.Stream = response.GetResponseStream();
229+
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
230+
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
231+
// https://github.com/elastic/elasticsearch-net/issues/2311
232+
// if stream is null call dispose on response instead.
233+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
234+
getResponseCancellationHandle.Unregister(((IAsyncResult) apmGetResponseTask).AsyncWaitHandle);
235+
}
236+
237+
private static async Task PostRequestAsync<TReturn>(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request,
238+
PostData<object> data) where TReturn : class
239+
{
240+
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
241+
var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
242+
243+
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
244+
{
245+
if (requestData.HttpCompression)
246+
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
247+
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
248+
else
249+
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
250+
}
251+
getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle);
252+
}
253+
242254
private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebException exception)
243255
where TReturn : class
244256
{

0 commit comments

Comments
 (0)