Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 151 additions & 150 deletions Enyim.Caching/Memcached/AsyncSocketHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,181 +11,182 @@

namespace Enyim.Caching.Memcached
{
public partial class AsyncPooledSocket
{
/// <summary>
/// Supports exactly one reader and writer, but they can do IO concurrently
/// </summary>
private class AsyncSocketHelper
{
private const int ChunkSize = 65536;

private AsyncPooledSocket socket;
private SlidingBuffer asyncBuffer;

private SocketAsyncEventArgs readEvent;
public partial class AsyncPooledSocket
{
/// <summary>
/// Supports exactly one reader and writer, but they can do IO concurrently
/// </summary>
private class AsyncSocketHelper
{
private const int ChunkSize = 65536;

private readonly PooledSocket socket;
private readonly SlidingBuffer asyncBuffer;

private readonly SocketAsyncEventArgs readEvent;
#if DEBUG_IO
private int doingIO;
#endif
private int remainingRead;
private int expectedToRead;
private AsyncIOArgs pendingArgs;

private int isAborted;
private ManualResetEvent readInProgressEvent;

public AsyncSocketHelper(AsyncPooledSocket socket)
{
this.socket = socket;
this.asyncBuffer = new SlidingBuffer(ChunkSize);

this.readEvent = new SocketAsyncEventArgs();
this.readEvent.Completed += new EventHandler<SocketAsyncEventArgs>(AsyncReadCompleted);
this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize);

this.readInProgressEvent = new ManualResetEvent(false);
}

/// <summary>
/// returns true if io is pending
/// </summary>
/// <param name="p"></param>
/// <returns></returns>
public bool Read(AsyncIOArgs p)
{
var count = p.Count;
if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0");
private int remainingRead;
private int expectedToRead;
private AsyncIOArgs pendingArgs;

private int isAborted;
private readonly ManualResetEvent readInProgressEvent;

public AsyncSocketHelper(PooledSocket socket)
{
this.socket = socket;
this.asyncBuffer = new SlidingBuffer(ChunkSize);

this.readEvent = new SocketAsyncEventArgs();
this.readEvent.Completed += new EventHandler<SocketAsyncEventArgs>(AsyncReadCompleted);
this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize);

this.readInProgressEvent = new ManualResetEvent(false);
}

/// <summary>
/// returns true if io is pending
/// </summary>
/// <param name="p"></param>
/// <returns></returns>
public bool Read(AsyncIOArgs p)
{
var count = p.Count;
if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0");
#if DEBUG_IO
if (Interlocked.CompareExchange(ref this.doingIO, 1, 0) != 0)
throw new InvalidOperationException("Receive is already in progress");
#endif
this.expectedToRead = p.Count;
this.pendingArgs = p;

p.Fail = false;
p.Result = null;

if (this.asyncBuffer.Available >= count)
{
PublishResult(false);

return false;
}
else
{
this.remainingRead = count - this.asyncBuffer.Available;
this.isAborted = 0;

this.BeginReceive();

return true;
}
}

public void DiscardBuffer()
{
this.asyncBuffer.UnsafeClear();
}

private void BeginReceive()
{
while (this.remainingRead > 0)
{
this.readInProgressEvent.Reset();

if (this.socket._socket.ReceiveAsync(this.readEvent))
{
// wait until the timeout elapses, then abort this reading process
// EndREceive will be triggered sooner or later but its timeout
// may be higher than our read timeout, so it's not reliable
if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout))
this.AbortReadAndTryPublishError(false);

return;
}

this.EndReceive();
}
}

void AsyncReadCompleted(object sender, SocketAsyncEventArgs e)
{
if (this.EndReceive())
this.BeginReceive();
}

private void AbortReadAndTryPublishError(bool markAsDead)
{
if (markAsDead)
this.socket._isAlive = false;

// we've been already aborted, so quit
// both the EndReceive and the wait on the event can abort the read
// but only one should of them should continue the async call chain
if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0)
return;

this.remainingRead = 0;
var p = this.pendingArgs;
this.expectedToRead = p.Count;
this.pendingArgs = p;

p.Fail = false;
p.Result = null;

if (this.asyncBuffer.Available >= count)
{
PublishResult(false);

return false;
}
else
{
this.remainingRead = count - this.asyncBuffer.Available;
this.isAborted = 0;

this.BeginReceive();

return true;
}
}

public void DiscardBuffer()
{
this.asyncBuffer.UnsafeClear();
}

private void BeginReceive()
{
throw new NotImplementedException();
//while (this.remainingRead > 0)
//{
// this.readInProgressEvent.Reset();

// if (this.socket.ReceiveAsync(this.readEvent))
// {
// // wait until the timeout elapses, then abort this reading process
// // EndREceive will be triggered sooner or later but its timeout
// // may be higher than our read timeout, so it's not reliable
// if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout))
// this.AbortReadAndTryPublishError(false);

// return;
// }

// this.EndReceive();
//}
}

void AsyncReadCompleted(object sender, SocketAsyncEventArgs e)
{
if (this.EndReceive())
this.BeginReceive();
}

private void AbortReadAndTryPublishError(bool markAsDead)
{
if (markAsDead)
this.socket.IsAlive = false;

// we've been already aborted, so quit
// both the EndReceive and the wait on the event can abort the read
// but only one should of them should continue the async call chain
if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0)
return;

this.remainingRead = 0;
var p = this.pendingArgs;
#if DEBUG_IO
Thread.MemoryBarrier();

this.doingIO = 0;
#endif

p.Fail = true;
p.Result = null;
p.Fail = true;
p.Result = null;

this.pendingArgs.Next(p);
}
this.pendingArgs.Next(p);
}

/// <summary>
/// returns true when io is pending
/// </summary>
/// <returns></returns>
private bool EndReceive()
{
this.readInProgressEvent.Set();
/// <summary>
/// returns true when io is pending
/// </summary>
/// <returns></returns>
private bool EndReceive()
{
this.readInProgressEvent.Set();

var read = this.readEvent.BytesTransferred;
if (this.readEvent.SocketError != SocketError.Success
|| read == 0)
{
this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed"));
var read = this.readEvent.BytesTransferred;
if (this.readEvent.SocketError != SocketError.Success
|| read == 0)
{
this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed"));

return false;
}
return false;
}

this.remainingRead -= read;
this.asyncBuffer.Append(this.readEvent.Buffer, 0, read);
this.remainingRead -= read;
this.asyncBuffer.Append(this.readEvent.Buffer, 0, read);

if (this.remainingRead <= 0)
{
this.PublishResult(true);
if (this.remainingRead <= 0)
{
this.PublishResult(true);

return false;
}
return false;
}

return true;
}
return true;
}

private void PublishResult(bool isAsync)
{
var retval = this.pendingArgs;
private void PublishResult(bool isAsync)
{
var retval = this.pendingArgs;

var data = new byte[this.expectedToRead];
this.asyncBuffer.Read(data, 0, retval.Count);
pendingArgs.Result = data;
var data = new byte[this.expectedToRead];
this.asyncBuffer.Read(data, 0, retval.Count);
pendingArgs.Result = data;
#if DEBUG_IO
Thread.MemoryBarrier();
this.doingIO = 0;
#endif

if (isAsync)
pendingArgs.Next(pendingArgs);
}
}
}
if (isAsync)
pendingArgs.Next(pendingArgs);
}
}
}

public partial class PooledSocket
{
Expand All @@ -196,10 +197,10 @@ private class AsyncSocketHelper
{
private const int ChunkSize = 65536;

private PooledSocket socket;
private SlidingBuffer asyncBuffer;
private readonly PooledSocket socket;
private readonly SlidingBuffer asyncBuffer;

private SocketAsyncEventArgs readEvent;
private readonly SocketAsyncEventArgs readEvent;
#if DEBUG_IO
private int doingIO;
#endif
Expand Down
16 changes: 13 additions & 3 deletions Enyim.Caching/Memcached/MemcachedNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.Serialization;
using System.Security;
using System.Threading;
Expand Down Expand Up @@ -184,6 +185,7 @@ public async Task<IPooledSocketResult> AcquireAsync()
poolInitSemaphore.Release();
}
}

try
{
return await this.internalPoolImpl.AcquireAsync();
Expand Down Expand Up @@ -442,7 +444,6 @@ public IPooledSocketResult Acquire()
message = "Could not get a socket from the pool, Creating a new item. " + _endPoint;
if (_isDebugEnabled) _logger.LogDebug(message);


try
{
// okay, create the new item
Expand Down Expand Up @@ -805,9 +806,11 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
//if Get, call BinaryRequest.CreateBuffer()
var b = op.GetBuffer();

await pooledSocket.WriteSync(b);
_logger.LogDebug("pooledSocket.WriteAsync...");
await pooledSocket.WriteAsync(b);

//if Get, call BinaryResponse
_logger.LogDebug($"{op}.ReadResponseAsync...");
var readResult = await op.ReadResponseAsync(pooledSocket);
if (readResult.Success)
{
Expand All @@ -823,7 +826,14 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
{
_logger.LogError(nameof(MemcachedNode), e);

result.Fail("Exception reading response", e);
result.Fail("IOException reading response", e);
return result;
}
catch (SocketException e)
{
_logger.LogError(nameof(MemcachedNode), e);

result.Fail("SocketException reading response", e);
return result;
}
finally
Expand Down
Loading