diff --git a/Enyim.Caching/Memcached/AsyncSocketHelper.cs b/Enyim.Caching/Memcached/AsyncSocketHelper.cs index 6ef38235..eb7e1764 100644 --- a/Enyim.Caching/Memcached/AsyncSocketHelper.cs +++ b/Enyim.Caching/Memcached/AsyncSocketHelper.cs @@ -11,181 +11,182 @@ namespace Enyim.Caching.Memcached { - public partial class AsyncPooledSocket - { - /// - /// Supports exactly one reader and writer, but they can do IO concurrently - /// - private class AsyncSocketHelper - { - private const int ChunkSize = 65536; - - private AsyncPooledSocket socket; - private SlidingBuffer asyncBuffer; - - private SocketAsyncEventArgs readEvent; + public partial class AsyncPooledSocket + { + /// + /// Supports exactly one reader and writer, but they can do IO concurrently + /// + private class AsyncSocketHelper + { + private const int ChunkSize = 65536; + + private readonly PooledSocket socket; + private readonly SlidingBuffer asyncBuffer; + + private readonly SocketAsyncEventArgs readEvent; #if DEBUG_IO private int doingIO; #endif - private int remainingRead; - private int expectedToRead; - private AsyncIOArgs pendingArgs; - - private int isAborted; - private ManualResetEvent readInProgressEvent; - - public AsyncSocketHelper(AsyncPooledSocket socket) - { - this.socket = socket; - this.asyncBuffer = new SlidingBuffer(ChunkSize); - - this.readEvent = new SocketAsyncEventArgs(); - this.readEvent.Completed += new EventHandler(AsyncReadCompleted); - this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize); - - this.readInProgressEvent = new ManualResetEvent(false); - } - - /// - /// returns true if io is pending - /// - /// - /// - public bool Read(AsyncIOArgs p) - { - var count = p.Count; - if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0"); + private int remainingRead; + private int expectedToRead; + private AsyncIOArgs pendingArgs; + + private int isAborted; + private readonly ManualResetEvent readInProgressEvent; + + public AsyncSocketHelper(PooledSocket socket) + { + this.socket = socket; + this.asyncBuffer = new SlidingBuffer(ChunkSize); + + this.readEvent = new SocketAsyncEventArgs(); + this.readEvent.Completed += new EventHandler(AsyncReadCompleted); + this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize); + + this.readInProgressEvent = new ManualResetEvent(false); + } + + /// + /// returns true if io is pending + /// + /// + /// + public bool Read(AsyncIOArgs p) + { + var count = p.Count; + if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0"); #if DEBUG_IO if (Interlocked.CompareExchange(ref this.doingIO, 1, 0) != 0) throw new InvalidOperationException("Receive is already in progress"); #endif - this.expectedToRead = p.Count; - this.pendingArgs = p; - - p.Fail = false; - p.Result = null; - - if (this.asyncBuffer.Available >= count) - { - PublishResult(false); - - return false; - } - else - { - this.remainingRead = count - this.asyncBuffer.Available; - this.isAborted = 0; - - this.BeginReceive(); - - return true; - } - } - - public void DiscardBuffer() - { - this.asyncBuffer.UnsafeClear(); - } - - private void BeginReceive() - { - while (this.remainingRead > 0) - { - this.readInProgressEvent.Reset(); - - if (this.socket._socket.ReceiveAsync(this.readEvent)) - { - // wait until the timeout elapses, then abort this reading process - // EndREceive will be triggered sooner or later but its timeout - // may be higher than our read timeout, so it's not reliable - if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout)) - this.AbortReadAndTryPublishError(false); - - return; - } - - this.EndReceive(); - } - } - - void AsyncReadCompleted(object sender, SocketAsyncEventArgs e) - { - if (this.EndReceive()) - this.BeginReceive(); - } - - private void AbortReadAndTryPublishError(bool markAsDead) - { - if (markAsDead) - this.socket._isAlive = false; - - // we've been already aborted, so quit - // both the EndReceive and the wait on the event can abort the read - // but only one should of them should continue the async call chain - if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0) - return; - - this.remainingRead = 0; - var p = this.pendingArgs; + this.expectedToRead = p.Count; + this.pendingArgs = p; + + p.Fail = false; + p.Result = null; + + if (this.asyncBuffer.Available >= count) + { + PublishResult(false); + + return false; + } + else + { + this.remainingRead = count - this.asyncBuffer.Available; + this.isAborted = 0; + + this.BeginReceive(); + + return true; + } + } + + public void DiscardBuffer() + { + this.asyncBuffer.UnsafeClear(); + } + + private void BeginReceive() + { + throw new NotImplementedException(); + //while (this.remainingRead > 0) + //{ + // this.readInProgressEvent.Reset(); + + // if (this.socket.ReceiveAsync(this.readEvent)) + // { + // // wait until the timeout elapses, then abort this reading process + // // EndREceive will be triggered sooner or later but its timeout + // // may be higher than our read timeout, so it's not reliable + // if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout)) + // this.AbortReadAndTryPublishError(false); + + // return; + // } + + // this.EndReceive(); + //} + } + + void AsyncReadCompleted(object sender, SocketAsyncEventArgs e) + { + if (this.EndReceive()) + this.BeginReceive(); + } + + private void AbortReadAndTryPublishError(bool markAsDead) + { + if (markAsDead) + this.socket.IsAlive = false; + + // we've been already aborted, so quit + // both the EndReceive and the wait on the event can abort the read + // but only one should of them should continue the async call chain + if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0) + return; + + this.remainingRead = 0; + var p = this.pendingArgs; #if DEBUG_IO Thread.MemoryBarrier(); this.doingIO = 0; #endif - p.Fail = true; - p.Result = null; + p.Fail = true; + p.Result = null; - this.pendingArgs.Next(p); - } + this.pendingArgs.Next(p); + } - /// - /// returns true when io is pending - /// - /// - private bool EndReceive() - { - this.readInProgressEvent.Set(); + /// + /// returns true when io is pending + /// + /// + private bool EndReceive() + { + this.readInProgressEvent.Set(); - var read = this.readEvent.BytesTransferred; - if (this.readEvent.SocketError != SocketError.Success - || read == 0) - { - this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed")); + var read = this.readEvent.BytesTransferred; + if (this.readEvent.SocketError != SocketError.Success + || read == 0) + { + this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed")); - return false; - } + return false; + } - this.remainingRead -= read; - this.asyncBuffer.Append(this.readEvent.Buffer, 0, read); + this.remainingRead -= read; + this.asyncBuffer.Append(this.readEvent.Buffer, 0, read); - if (this.remainingRead <= 0) - { - this.PublishResult(true); + if (this.remainingRead <= 0) + { + this.PublishResult(true); - return false; - } + return false; + } - return true; - } + return true; + } - private void PublishResult(bool isAsync) - { - var retval = this.pendingArgs; + private void PublishResult(bool isAsync) + { + var retval = this.pendingArgs; - var data = new byte[this.expectedToRead]; - this.asyncBuffer.Read(data, 0, retval.Count); - pendingArgs.Result = data; + var data = new byte[this.expectedToRead]; + this.asyncBuffer.Read(data, 0, retval.Count); + pendingArgs.Result = data; #if DEBUG_IO Thread.MemoryBarrier(); this.doingIO = 0; #endif - if (isAsync) - pendingArgs.Next(pendingArgs); - } - } - } + if (isAsync) + pendingArgs.Next(pendingArgs); + } + } + } public partial class PooledSocket { @@ -196,10 +197,10 @@ private class AsyncSocketHelper { private const int ChunkSize = 65536; - private PooledSocket socket; - private SlidingBuffer asyncBuffer; + private readonly PooledSocket socket; + private readonly SlidingBuffer asyncBuffer; - private SocketAsyncEventArgs readEvent; + private readonly SocketAsyncEventArgs readEvent; #if DEBUG_IO private int doingIO; #endif diff --git a/Enyim.Caching/Memcached/MemcachedNode.cs b/Enyim.Caching/Memcached/MemcachedNode.cs index 0fdab3a0..b578d5cd 100755 --- a/Enyim.Caching/Memcached/MemcachedNode.cs +++ b/Enyim.Caching/Memcached/MemcachedNode.cs @@ -9,6 +9,7 @@ using System.Diagnostics; using System.IO; using System.Net; +using System.Net.Sockets; using System.Runtime.Serialization; using System.Security; using System.Threading; @@ -184,6 +185,7 @@ public async Task AcquireAsync() poolInitSemaphore.Release(); } } + try { return await this.internalPoolImpl.AcquireAsync(); @@ -442,7 +444,6 @@ public IPooledSocketResult Acquire() message = "Could not get a socket from the pool, Creating a new item. " + _endPoint; if (_isDebugEnabled) _logger.LogDebug(message); - try { // okay, create the new item @@ -805,9 +806,11 @@ protected virtual async Task ExecuteOperationAsync(IOperati //if Get, call BinaryRequest.CreateBuffer() var b = op.GetBuffer(); - await pooledSocket.WriteSync(b); + _logger.LogDebug("pooledSocket.WriteAsync..."); + await pooledSocket.WriteAsync(b); //if Get, call BinaryResponse + _logger.LogDebug($"{op}.ReadResponseAsync..."); var readResult = await op.ReadResponseAsync(pooledSocket); if (readResult.Success) { @@ -823,7 +826,14 @@ protected virtual async Task ExecuteOperationAsync(IOperati { _logger.LogError(nameof(MemcachedNode), e); - result.Fail("Exception reading response", e); + result.Fail("IOException reading response", e); + return result; + } + catch (SocketException e) + { + _logger.LogError(nameof(MemcachedNode), e); + + result.Fail("SocketException reading response", e); return result; } finally diff --git a/Enyim.Caching/Memcached/PooledSocket.cs b/Enyim.Caching/Memcached/PooledSocket.cs index 35e0a589..660969c0 100755 --- a/Enyim.Caching/Memcached/PooledSocket.cs +++ b/Enyim.Caching/Memcached/PooledSocket.cs @@ -172,6 +172,7 @@ public void Reset() public bool IsAlive { get { return _isAlive; } + set { _isAlive = value; } } /// @@ -253,9 +254,12 @@ public int ReadByte() { return _inputStream.ReadByte(); } - catch (IOException) + catch (Exception ex) { - _isAlive = false; + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } throw; } @@ -269,9 +273,12 @@ public int ReadByteAsync() { return _inputStream.ReadByte(); } - catch (IOException) + catch (Exception ex) { - _isAlive = false; + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } throw; } } @@ -295,9 +302,13 @@ public async Task ReadAsync(byte[] buffer, int offset, int count) offset += currentRead; shouldRead -= currentRead; } - catch (IOException) + catch (Exception ex) { - _isAlive = false; + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } + throw; } } @@ -329,9 +340,12 @@ public void Read(byte[] buffer, int offset, int count) offset += currentRead; shouldRead -= currentRead; } - catch (IOException) + catch (Exception ex) { - _isAlive = false; + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } throw; } } @@ -359,35 +373,48 @@ public void Write(IList> buffers) SocketError status; -#if DEBUG - int total = 0; - for (int i = 0, C = buffers.Count; i < C; i++) - total += buffers[i].Count; - - if (_socket.Send(buffers, SocketFlags.None, out status) != total) - System.Diagnostics.Debugger.Break(); -#else - _socket.Send(buffers, SocketFlags.None, out status); -#endif - - if (status != SocketError.Success) + try { - _isAlive = false; - - ThrowHelper.ThrowSocketWriteError(_endpoint, status); + _socket.Send(buffers, SocketFlags.None, out status); + if (status != SocketError.Success) + { + _isAlive = false; + ThrowHelper.ThrowSocketWriteError(_endpoint, status); + } + } + catch (Exception ex) + { + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } + _logger.LogError(ex, nameof(PooledSocket.Write)); + throw; } } - public async Task WriteSync(IList> buffers) + public async Task WriteAsync(IList> buffers) { + CheckDisposed(); + try { - await _socket.SendAsync(buffers, SocketFlags.None); + var bytesTransferred = await _socket.SendAsync(buffers, SocketFlags.None); + if (bytesTransferred <= 0) + { + _isAlive = false; + _logger.LogError($"Failed to {nameof(PooledSocket.WriteAsync)}. bytesTransferred: {bytesTransferred}"); + ThrowHelper.ThrowSocketWriteError(_endpoint); + } } catch (Exception ex) { - _isAlive = false; - _logger.LogError(ex, nameof(PooledSocket.WriteSync)); + if (ex is IOException || ex is SocketException) + { + _isAlive = false; + } + _logger.LogError(ex, nameof(PooledSocket.WriteAsync)); + throw; } } diff --git a/Enyim.Caching/Memcached/ThrowHelper.cs b/Enyim.Caching/Memcached/ThrowHelper.cs index 321e69b7..463074ef 100755 --- a/Enyim.Caching/Memcached/ThrowHelper.cs +++ b/Enyim.Caching/Memcached/ThrowHelper.cs @@ -1,17 +1,23 @@ using System; +using System.IO; using System.Net; using System.Net.Sockets; namespace Enyim.Caching.Memcached { - internal static class ThrowHelper - { - public static void ThrowSocketWriteError(EndPoint endpoint, SocketError error) - { - // move the string into resource file - throw new Exception(String.Format("Failed to write to the socket '{0}'. Error: {1}", endpoint, error)); - } - } + internal static class ThrowHelper + { + public static void ThrowSocketWriteError(EndPoint endpoint) + { + throw new IOException(string.Format("Failed to write to the socket '{0}'.", endpoint)); + } + + public static void ThrowSocketWriteError(EndPoint endpoint, SocketError error) + { + // move the string into resource file + throw new IOException(string.Format("Failed to write to the socket '{0}'. Error: {1}", endpoint, error)); + } + } } #region [ License information ]