From 47095618c66d7c3351d2aed42183f34d96d52544 Mon Sep 17 00:00:00 2001 From: Henke Date: Mon, 2 Sep 2013 16:49:32 +0200 Subject: [PATCH] Replaced async model with true async Rewrote DoAsyncRequest to use real async web methods instead of starting new tasks and allocating thread pool resources for waits and callbacks. --- .../Connection/AsyncRequestOperation.cs | 143 ++++++++++++++++++ src/Nest/Domain/Connection/Connection.cs | 130 +--------------- src/Nest/Nest.csproj | 1 + 3 files changed, 152 insertions(+), 122 deletions(-) create mode 100644 src/Nest/Domain/Connection/AsyncRequestOperation.cs diff --git a/src/Nest/Domain/Connection/AsyncRequestOperation.cs b/src/Nest/Domain/Connection/AsyncRequestOperation.cs new file mode 100644 index 00000000000..1def435be2e --- /dev/null +++ b/src/Nest/Domain/Connection/AsyncRequestOperation.cs @@ -0,0 +1,143 @@ +using System; +using System.IO; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Nest.Domain.Connection; + +namespace Nest +{ + public class AsyncRequestOperation : TaskCompletionSource, IDisposable + { + private readonly HttpWebRequest m_request; + private readonly string m_requestData; + private readonly IConnectionSettings m_connectionSettings; + private ConnectionStatusTracer m_tracer; + private WebResponse m_response; + private Stream m_responseStream; + + public AsyncRequestOperation( HttpWebRequest request, string requestData, IConnectionSettings connectionSettings, ConnectionStatusTracer tracer ) + { + m_request = request; + m_requestData = requestData; + m_connectionSettings = connectionSettings; + m_tracer = tracer; + Start(); + } + + private void Start() + { + if ( this.m_requestData != null ) + WriteRequestDataAsync(); + else + GetResponseAsync(); + } + + private void WriteRequestDataAsync() + { + this.m_request.BeginGetRequestStream( this.Monitor( ar => + { + var r = this.m_request.EndGetRequestStream( ar ); + var buffer = Encoding.UTF8.GetBytes( this.m_requestData ); + r.BeginWrite( buffer, 0, buffer.Length, this.Monitor( writeIar => + { + r.EndWrite( writeIar ); + GetResponseAsync(); + } ), null ); + } ), null ); + } + + private void GetResponseAsync() + { + this.m_request.BeginGetResponse( this.Monitor( iarResponse => + { + m_response = m_request.EndGetResponse( iarResponse ); + m_responseStream = m_response.GetResponseStream(); + + var buffer = new byte[8192]; + var result = new MemoryStream( buffer.Length ); + ReadResponseStreamAsync( this.m_responseStream, buffer, result ); + + } ), null ); + } + + private void ReadResponseStreamAsync( Stream stream, byte[] buffer, MemoryStream result ) + { + stream.BeginRead( buffer, 0, buffer.Length, this.Monitor( iar => + { + var bytes = stream.EndRead( iar ); + if ( bytes == 0 ) + { + Done( result ); + return; + } + + result.Write( buffer, 0, bytes ); + ReadResponseStreamAsync( stream, buffer, result ); + + } ), null ); + } + + private void Done( ConnectionStatus connectionStatus ) + { + m_tracer.SetResult( connectionStatus ); + TrySetResult( connectionStatus ); + Dispose(); + } + + private void Done( Stream result ) + { + result.Position = 0; + var reader = new StreamReader( result ); + Done( new ConnectionStatus( reader.ReadToEnd() ) + { + Request = this.m_requestData, + RequestUrl = this.m_request.RequestUri.ToString(), + RequestMethod = this.m_request.Method + } ); + + } + + private AsyncCallback Monitor( AsyncCallback callback ) + { + return ar => + { + try + { + callback( ar ); + } + catch ( WebException webException ) + { + var connectionStatus = new ConnectionStatus( webException ) + { + Request = this.m_requestData, + RequestUrl = this.m_request.RequestUri.ToString(), + RequestMethod = this.m_request.Method + }; + m_connectionSettings.ConnectionStatusHandler( connectionStatus ); + Done( connectionStatus ); + } + catch ( Exception e ) + { + TrySetException( e ); + Dispose(); + } + }; + } + + public void Dispose() + { + Dispose( ref m_response ); + Dispose( ref m_responseStream ); + Dispose( ref m_tracer ); + } + + private static void Dispose( ref T disposable ) where T : class, IDisposable + { + var d = Interlocked.Exchange( ref disposable, null ); + if ( d != null ) + d.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/Nest/Domain/Connection/Connection.cs b/src/Nest/Domain/Connection/Connection.cs index 61661b1873e..45998e1b70d 100644 --- a/src/Nest/Domain/Connection/Connection.cs +++ b/src/Nest/Domain/Connection/Connection.cs @@ -1,11 +1,8 @@ using System; -using System.Collections.Generic; -using System.Collections.Specialized; using System.IO; using System.Net; using System.Reflection; using System.Text; -using System.Threading; using System.Threading.Tasks; using Nest.Domain.Connection; @@ -16,7 +13,6 @@ public class Connection : IConnection const int BUFFER_SIZE = 1024; private IConnectionSettings _ConnectionSettings { get; set; } - private Semaphore _ResourceLock; private readonly bool _enableTrace; public Connection(IConnectionSettings settings) @@ -25,7 +21,6 @@ public Connection(IConnectionSettings settings) throw new ArgumentNullException("settings"); this._ConnectionSettings = settings; - this._ResourceLock = new Semaphore(settings.MaximumAsyncConnections, settings.MaximumAsyncConnections); this._enableTrace = settings.TraceEnabled; } @@ -205,126 +200,17 @@ protected virtual ConnectionStatus DoSynchronousRequest(HttpWebRequest request, return cs; } - } - + } } protected virtual Task DoAsyncRequest(HttpWebRequest request, string data = null) { - var tcs = new TaskCompletionSource(); - var timeout = this._ConnectionSettings.Timeout; - if (!this._ResourceLock.WaitOne(timeout)) - { - using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled)) - { - var m = "Could not start the operation before the timeout of " + timeout + - "ms completed while waiting for the semaphore"; - var cs = new ConnectionStatus(new TimeoutException(m)); - tcs.SetResult(cs); - tracer.SetResult(cs); - return tcs.Task; - } - } - try - { - return Task.Factory.StartNew(() => - { - using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled)) - { - this.Iterate(this._AsyncSteps(request, tcs, data), tcs); - var cs = tcs.Task.Result; - tracer.SetResult(cs); - _ConnectionSettings.ConnectionStatusHandler(cs); - return cs; - } - }, TaskCreationOptions.LongRunning); - } - finally - { - this._ResourceLock.Release(); - } - } - - private IEnumerable _AsyncSteps(HttpWebRequest request, TaskCompletionSource tcs, string data = null) - { - var timeout = this._ConnectionSettings.Timeout; - - var state = new ConnectionState { Connection = request }; - - if (data != null) - { - var getRequestStream = Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null); - ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true); - yield return getRequestStream; - - var requestStream = getRequestStream.Result; - try - { - byte[] buffer = Encoding.UTF8.GetBytes(data); - var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, buffer, 0, buffer.Length, state); - yield return writeToRequestStream; - } - finally - { - requestStream.Close(); - } - } - - // Get the response - var getResponse = Task.Factory.FromAsync(request.BeginGetResponse, request.EndGetResponse, null); - ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true); - yield return getResponse; - - // Get the response stream - using (var response = (HttpWebResponse)getResponse.Result) - using (var responseStream = response.GetResponseStream()) - { - // Copy all data from the response stream - var output = new MemoryStream(); - var buffer = new byte[BUFFER_SIZE]; - while (responseStream != null) - { - var read = Task.Factory.FromAsync(responseStream.BeginRead, responseStream.EndRead, buffer, 0, BUFFER_SIZE, null); - yield return read; - if (read.Result == 0) break; - output.Write(buffer, 0, read.Result); - } - - // Decode the data and store the result - var result = Encoding.UTF8.GetString(output.ToArray()); - var cs = new ConnectionStatus(result) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method }; - tcs.TrySetResult(cs); - } - yield break; - - } - - public void Iterate(IEnumerable asyncIterator, TaskCompletionSource tcs) - { - var enumerator = asyncIterator.GetEnumerator(); - Action recursiveBody = null; - recursiveBody = completedTask => - { - if (completedTask != null && completedTask.IsFaulted) - { - //none of the individual steps in _AsyncSteps run in parallel for 1 request - //as this would be impossible we can assume Aggregate Exception.InnerException - var exception = completedTask.Exception.InnerException; - - //cleanly exit from exceptions in stages if the exception is a webexception - if (exception is WebException) - tcs.SetResult(new ConnectionStatus(exception)); - else - tcs.TrySetException(exception); - enumerator.Dispose(); - } - else if (enumerator.MoveNext()) - { - enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously); - } - else enumerator.Dispose(); - }; - recursiveBody(null); + var operation = new AsyncRequestOperation( + request, + data, + _ConnectionSettings, + new ConnectionStatusTracer( this._ConnectionSettings.TraceEnabled ) ); + return operation.Task; } private Uri _CreateUriString(string path) @@ -386,4 +272,4 @@ public static void LeaveDotsAndSlashesEscaped(Uri uri) fieldInfo.SetValue(uriParser, uriSyntaxFlags); } } -} +} \ No newline at end of file diff --git a/src/Nest/Nest.csproj b/src/Nest/Nest.csproj index 3c91474b11c..0877d967f52 100644 --- a/src/Nest/Nest.csproj +++ b/src/Nest/Nest.csproj @@ -68,6 +68,7 @@ +