Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 57 additions & 44 deletions src/Elasticsearch.Net/Connection/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
}


private static void RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
private static RegisteredWaitHandle RegisterApmTaskTimeout(IAsyncResult result, WebRequest request, RequestData requestData) =>
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, request, requestData.RequestTimeout, true);

private static void TimeoutCallback(object state, bool timedOut)
Expand All @@ -189,69 +189,82 @@ private static void TimeoutCallback(object state, bool timedOut)
(state as WebRequest)?.Abort();
}

public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData, CancellationToken cancellationToken) where TReturn : class
public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData,
CancellationToken cancellationToken) where TReturn : class
{
var builder = new ResponseBuilder<TReturn>(requestData, cancellationToken);
WaitHandle apmWaitHandle = null;
RegisteredWaitHandle apmTaskTimeout = null;
try
{
var request = this.CreateHttpWebRequest(requestData);
cancellationToken.Register(()=>request.Abort());
var data = requestData.PostData;

if (data != null)
var request = this.CreateHttpWebRequest(requestData);
using (cancellationToken.Register(() => request.Abort()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also need to include await GetResponseAsync(requestData, request, builder); in the using block?

Copy link
Member Author

@Mpdreamz Mpdreamz Jun 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right i think it does

{
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);

using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
{
if (requestData.HttpCompression)
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
else
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
if (data != null)
await PostRequestAsync(requestData, cancellationToken, request, data);
requestData.MadeItToResponse = true;
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
//Either the stream or the response object needs to be closed but not both although it won't
//throw any errors if both are closed atleast one of them has to be Closed.
//Since we expose the stream we let closing the stream determining when to close the connection

var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
apmWaitHandle = ((IAsyncResult) apmGetResponseTask).AsyncWaitHandle;
apmTaskTimeout = RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);

var response = (HttpWebResponse) (await apmGetResponseTask.ConfigureAwait(false));
builder.StatusCode = (int) response.StatusCode;
builder.Stream = response.GetResponseStream();
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
// https://github.com/elastic/elasticsearch-net/issues/2311
// if stream is null call dispose on response instead.
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
}
requestData.MadeItToResponse = true;

//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
//Either the stream or the response object needs to be closed but not both although it won't
//throw any errors if both are closed atleast one of them has to be Closed.
//Since we expose the stream we let closing the stream determining when to close the connection

var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null);
RegisterApmTaskTimeout(apmGetResponseTask, request, requestData);

var response = (HttpWebResponse)(await apmGetResponseTask.ConfigureAwait(false));
builder.StatusCode = (int)response.StatusCode;
builder.Stream = response.GetResponseStream();
if (response.SupportsHeaders && response.Headers.HasKeys() && response.Headers.AllKeys.Contains("Warning"))
builder.DeprecationWarnings = response.Headers.GetValues("Warning");
// https://github.com/elastic/elasticsearch-net/issues/2311
// if stream is null call dispose on response instead.
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
}
catch (WebException e)
{
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
HandleException(builder, e);
}

catch
{
if (apmWaitHandle != null) apmTaskTimeout?.Unregister(apmWaitHandle);
throw;
}
return await builder.ToResponseAsync().ConfigureAwait(false);
}

private static async Task PostRequestAsync(RequestData requestData, CancellationToken cancellationToken, HttpWebRequest request,
PostData<object> data)
{
var apmGetRequestStreamTask = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
var getRequestStreamCancellationHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, requestData);

using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
{
if (requestData.HttpCompression)
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
else
await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
}
getRequestStreamCancellationHandle.Unregister(((IAsyncResult) apmGetRequestStreamTask).AsyncWaitHandle);
}

private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebException exception)
where TReturn : class
{
builder.Exception = exception;
var response = exception.Response as HttpWebResponse;
if (response != null)
{
builder.StatusCode = (int)response.StatusCode;
builder.Stream = response.GetResponseStream();
// https://github.com/elastic/elasticsearch-net/issues/2311
// if stream is null call dispose on response instead.
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
}
if (response == null) return;
builder.StatusCode = (int)response.StatusCode;
builder.Stream = response.GetResponseStream();
// https://github.com/elastic/elasticsearch-net/issues/2311
// if stream is null call dispose on response instead.
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
}

void IDisposable.Dispose() => this.DisposeManagedResources();
Expand Down