Skip to content

Commit c581816

Browse files
committed
use action unregister handler to clean up code and also unregister the begin/end getrequeststream apm task wait handler relates to #2776
1 parent d243eef commit c581816

File tree

1 file changed

+55
-54
lines changed

1 file changed

+55
-54
lines changed

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,19 @@ protected virtual void SetClientCertificates(HttpWebRequest request, RequestData
4343
protected virtual void SetServerCertificateValidationCallBackIfNeeded(HttpWebRequest request, RequestData requestData)
4444
{
4545
var callback = requestData?.ConnectionSettings?.ServerCertificateValidationCallback;
46-
#if !__MonoCS__
46+
#if !__MonoCS__
4747
//Only assign if one is defined on connection settings and a subclass has not already set one
4848
if (callback != null && request.ServerCertificateValidationCallback == null)
4949
request.ServerCertificateValidationCallback = new RemoteCertificateValidationCallback(callback);
50-
#else
50+
#else
5151
if (callback != null)
5252
throw new Exception("Mono misses ServerCertificateValidationCallback on HttpWebRequest");
5353
#endif
5454
}
5555

5656
protected virtual HttpWebRequest CreateWebRequest(RequestData requestData)
5757
{
58-
var request = (HttpWebRequest)WebRequest.Create(requestData.Uri);
58+
var request = (HttpWebRequest) WebRequest.Create(requestData.Uri);
5959

6060
request.Accept = requestData.Accept;
6161
request.ContentType = requestData.ContentType;
@@ -76,7 +76,7 @@ protected virtual HttpWebRequest CreateWebRequest(RequestData requestData)
7676
if (requestData.Headers != null && requestData.Headers.HasKeys())
7777
request.Headers.Add(requestData.Headers);
7878

79-
var timeout = (int)requestData.RequestTimeout.TotalMilliseconds;
79+
var timeout = (int) requestData.RequestTimeout.TotalMilliseconds;
8080
request.Timeout = timeout;
8181
request.ReadWriteTimeout = timeout;
8282

@@ -101,7 +101,6 @@ protected virtual void AlterServicePoint(ServicePoint requestServicePoint, Reque
101101
//this method only sets internal values and wont actually cause timers and such to be reset
102102
//So it should be idempotent if called with the same parameters
103103
requestServicePoint.SetTcpKeepAlive(true, requestData.KeepAliveTime, requestData.KeepAliveInterval);
104-
105104
}
106105

107106
protected virtual void SetProxyIfNeeded(HttpWebRequest request, RequestData requestData)
@@ -161,8 +160,8 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
161160
//Either the stream or the response object needs to be closed but not both although it won't
162161
//throw any errors if both are closed atleast one of them has to be Closed.
163162
//Since we expose the stream we let closing the stream determining when to close the connection
164-
var response = (HttpWebResponse)request.GetResponse();
165-
builder.StatusCode = (int)response.StatusCode;
163+
var response = (HttpWebResponse) request.GetResponse();
164+
builder.StatusCode = (int) response.StatusCode;
166165
builder.Stream = response.GetResponseStream();
167166

168167
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
@@ -180,8 +179,17 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
180179
}
181180

182181

183-
private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
184-
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true);
182+
/// <summary>
183+
/// Registers an APM async task cancellation on the threadpool
184+
/// </summary>
185+
/// <returns>An unregister action that can be used to remove the waithandle prematurely</returns>
186+
private static Action RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData)
187+
{
188+
var waitHandle = result.AsyncWaitHandle;
189+
var registeredWaitHandle =
190+
ThreadPool.RegisterWaitForSingleObject(waitHandle, TimeoutCallback, request, requestData.RequestTimeout, true);
191+
return () => registeredWaitHandle?.Unregister(waitHandle);
192+
}
185193

186194
private static void TimeoutCallback(object state, bool timedOut)
187195
{
@@ -193,74 +201,65 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
193201
CancellationToken cancellationToken) where TReturn : class
194202
{
195203
var builder = new ResponseBuilder<TReturn>(requestData, cancellationToken);
196-
WaitHandle apmWaitHandle = null;
197-
RegisteredWaitHandle apmTaskTimeout = null;
204+
Action unregisterWaitHandle = null;
198205
try
199206
{
200207
var data = requestData.PostData;
201208
var request = this.CreateHttpWebRequest(requestData);
202209
using (cancellationToken.Register(() => request.Abort()))
203210
{
204211
if (data != null)
205-
await PostRequestAsync(requestData, cancellationToken, request, data);
212+
{
213+
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
214+
unregisterWaitHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
215+
216+
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
217+
{
218+
if (requestData.HttpCompression)
219+
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
220+
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
221+
else
222+
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
223+
}
224+
unregisterWaitHandle?.Invoke();
225+
}
206226
requestData.MadeItToResponse = true;
207-
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
208-
//Either the stream or the response object needs to be closed but not both although it won't
209-
//throw any errors if both are closed atleast one of them has to be Closed.
210-
//Since we expose the stream we let closing the stream determining when to close the connection
211-
212-
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
213-
apmWaitHandle = ((IAsyncResult) apmGetResponseTask).AsyncWaitHandle;
214-
apmTaskTimeout = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
215-
216-
var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false));
217-
builder.StatusCode = (int) response.StatusCode;
218-
builder.Stream = response.GetResponseStream();
219-
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
220-
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
221-
// https://github.com/elastic/elasticsearch-net/issues/2311
222-
// if stream is null call dispose on response instead.
223-
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
224-
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
227+
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
228+
//Either the stream or the response object needs to be closed but not both although it won't
229+
//throw any errors if both are closed atleast one of them has to be Closed.
230+
//Since we expose the stream we let closing the stream determining when to close the connection
231+
232+
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
233+
unregisterWaitHandle = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);
234+
235+
var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false));
236+
builder.StatusCode = (int) response.StatusCode;
237+
builder.Stream = response.GetResponseStream();
238+
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
239+
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
240+
// https://github.com/elastic/elasticsearch-net/issues/2311
241+
// if stream is null call dispose on response instead.
242+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
225243
}
226244
}
227245
catch (WebException e)
228246
{
229-
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
230247
HandleException(builder, e);
231248
}
232-
catch
249+
finally
233250
{
234-
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
235-
throw;
251+
unregisterWaitHandle?.Invoke();
236252
}
237253
return await builder.ToResponseAsync().ConfigureAwait(false);
238254
}
239255

240-
private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request,
241-
PostData<object> data)
242-
{
243-
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
244-
var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);
245-
246-
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
247-
{
248-
if (requestData.HttpCompression)
249-
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
250-
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
251-
else
252-
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
253-
}
254-
getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle);
255-
}
256-
257-
private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebException exception)
256+
private static void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebException exception)
258257
where TReturn : class
259258
{
260259
builder.Exception = exception;
261260
var response = exception.Response as HttpWebResponse;
262261
if (response == null) return;
263-
builder.StatusCode = (int)response.StatusCode;
262+
builder.StatusCode = (int) response.StatusCode;
264263
builder.Stream = response.GetResponseStream();
265264
// https://github.com/elastic/elasticsearch-net/issues/2311
266265
// if stream is null call dispose on response instead.
@@ -269,7 +268,9 @@ private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebExcep
269268

270269
void IDisposable.Dispose() => this.DisposeManagedResources();
271270

272-
protected virtual void DisposeManagedResources() { }
271+
protected virtual void DisposeManagedResources()
272+
{
273+
}
273274
}
274275
}
275276
#endif

0 commit comments

Comments
 (0)