Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions Enyim.Caching/Memcached/MemcachedNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class MemcachedNode : IMemcachedNode
private bool isDisposed;

private readonly EndPoint _endPoint;
private readonly ISocketPoolConfiguration config;
private readonly ISocketPoolConfiguration _config;
private InternalPoolImpl internalPoolImpl;
private bool isInitialized = false;
private SemaphoreSlim poolInitSemaphore = new SemaphoreSlim(1, 1);
Expand All @@ -42,7 +42,7 @@ public MemcachedNode(
{
_endPoint = endpoint;
EndPointString = endpoint?.ToString().Replace("Unspecified/", string.Empty);
this.config = socketPoolConfig;
_config = socketPoolConfig;

if (socketPoolConfig.ConnectionTimeout.TotalMilliseconds >= Int32.MaxValue)
throw new InvalidOperationException("ConnectionTimeout must be < Int32.MaxValue");
Expand All @@ -65,7 +65,7 @@ public MemcachedNode(

protected INodeFailurePolicy FailurePolicy
{
get { return this.failurePolicy ?? (this.failurePolicy = this.config.FailurePolicyFactory.Create(this)); }
get { return this.failurePolicy ?? (this.failurePolicy = _config.FailurePolicyFactory.Create(this)); }
}

/// <summary>
Expand Down Expand Up @@ -121,7 +121,7 @@ public bool Ping()
// it's easier to create a new pool than reinitializing a dead one
// rewrite-then-dispose to avoid a race condition with Acquire (which does no locking)
var oldPool = this.internalPoolImpl;
var newPool = new InternalPoolImpl(this, this.config, _logger);
var newPool = new InternalPoolImpl(this, _config, _logger);

Interlocked.Exchange(ref this.internalPoolImpl, newPool);

Expand Down Expand Up @@ -774,7 +774,7 @@ protected internal virtual PooledSocket CreateSocket()
{
try
{
var ps = new PooledSocket(_endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger);
ps.Connect();
return ps;
}
Expand All @@ -790,7 +790,7 @@ protected internal virtual async Task<PooledSocket> CreateSocketAsync()
{
try
{
var ps = new PooledSocket(_endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
var ps = new PooledSocket(_endPoint, _config.ConnectionTimeout, _config.ReceiveTimeout, _logger);
await ps.ConnectAsync();
return ps;
}
Expand Down Expand Up @@ -872,11 +872,26 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
var b = op.GetBuffer();

_logger.LogDebug("pooledSocket.WriteAsync...");
await pooledSocket.WriteAsync(b);

var writeSocketTask = pooledSocket.WriteAsync(b);
if(await Task.WhenAny(writeSocketTask, Task.Delay(_config.ConnectionTimeout)) != writeSocketTask)
{
result.Fail("Timeout to pooledSocket.WriteAsync");
return result;
}
await writeSocketTask;

//if Get, call BinaryResponse
_logger.LogDebug($"{op}.ReadResponseAsync...");
var readResult = await op.ReadResponseAsync(pooledSocket);

var readResponseTask = op.ReadResponseAsync(pooledSocket);
if (await Task.WhenAny(readResponseTask, Task.Delay(_config.ConnectionTimeout)) != readResponseTask)
{
result.Fail($"Timeout to ReadResponseAsync(pooledSocket) for {op}");
return result;
}

var readResult = await readResponseTask;
if (readResult.Success)
{
result.Pass();
Expand Down Expand Up @@ -974,12 +989,12 @@ bool IMemcachedNode.Ping()

IOperationResult IMemcachedNode.Execute(IOperation op)
{
return this.ExecuteOperation(op);
return ExecuteOperation(op);
}

async Task<IOperationResult> IMemcachedNode.ExecuteAsync(IOperation op)
{
return await this.ExecuteOperationAsync(op);
return await ExecuteOperationAsync(op);
}

async Task<bool> IMemcachedNode.ExecuteAsync(IOperation op, Action<bool> next)
Expand Down