diff --git a/Enyim.Caching/Enyim.Caching.csproj b/Enyim.Caching/Enyim.Caching.csproj index 6fff766f..bac53732 100755 --- a/Enyim.Caching/Enyim.Caching.csproj +++ b/Enyim.Caching/Enyim.Caching.csproj @@ -25,5 +25,6 @@ + diff --git a/Enyim.Caching/Memcached/MemcachedNode.cs b/Enyim.Caching/Memcached/MemcachedNode.cs index 1185194a..5d36532b 100755 --- a/Enyim.Caching/Memcached/MemcachedNode.cs +++ b/Enyim.Caching/Memcached/MemcachedNode.cs @@ -1,9 +1,3 @@ -using Enyim.Caching.Configuration; -using Enyim.Caching.Memcached.Protocol.Binary; -using Enyim.Caching.Memcached.Results; -using Enyim.Caching.Memcached.Results.Extensions; -using Enyim.Collections; -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; @@ -13,6 +7,12 @@ using System.Security; using System.Threading; using System.Threading.Tasks; +using Enyim.Caching.Configuration; +using Enyim.Caching.Memcached.Protocol.Binary; +using Enyim.Caching.Memcached.Results; +using Enyim.Caching.Memcached.Results.Extensions; +using Enyim.Collections; +using Microsoft.Extensions.Logging; namespace Enyim.Caching.Memcached { @@ -591,14 +591,13 @@ protected async virtual Task ExecuteOperationAsync(IOperati { var pooledSocket = result.Value; - //if Get, call BinaryRequest.CreateBuffer() var b = op.GetBuffer(); await pooledSocket.WriteSync(b); //if Get, call BinaryResponse - var readResult = op.ReadResponse(pooledSocket); + var readResult = await op.ReadResponseAsync(pooledSocket); if (readResult.Success) { result.Pass(); diff --git a/Enyim.Caching/Memcached/PooledSocket.cs b/Enyim.Caching/Memcached/PooledSocket.cs index 96e0e67c..101ed03c 100755 --- a/Enyim.Caching/Memcached/PooledSocket.cs +++ b/Enyim.Caching/Memcached/PooledSocket.cs @@ -1,4 +1,3 @@ -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; @@ -11,6 +10,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace Enyim.Caching.Memcached { @@ -208,6 +208,21 @@ public int ReadByte() } } + public int ReadByteAsync() + { + this.CheckDisposed(); + + try + { + return this.inputStream.ReadByte(); + } + catch (IOException) + { + this.isAlive = false; + throw; + } + } + public async Task ReadBytesAsync(int count) { var buffer = new ArraySegment(new byte[count], 0, count); diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs index ba454e0d..0ea94814 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs @@ -5,110 +5,58 @@ namespace Enyim.Caching.Memcached.Protocol.Binary { public static class BinaryConverter { - public static unsafe ushort DecodeUInt16(byte[] buffer, int offset) + public static ushort DecodeUInt16(Span buffer, int offset) { return (ushort)((buffer[offset] << 8) + buffer[offset + 1]); } - public static unsafe ushort DecodeUInt16(byte* buffer, int offset) + public static int DecodeInt32(Span buffer, int offset) { - return (ushort)((buffer[offset] << 8) + buffer[offset + 1]); - } - - public static unsafe int DecodeInt32(ArraySegment segment, int offset) - { - fixed (byte* buffer = segment.Array) - { - byte* ptr = buffer + segment.Offset + offset; - - return DecodeInt32(buffer, 0); - } - } - - public static unsafe int DecodeInt32(byte* buffer, int offset) - { - buffer += offset; - - return (buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3]; - } - - public static unsafe int DecodeInt32(byte[] buffer, int offset) - { - return (buffer[offset] << 24) | (buffer[offset + 1] << 16) | (buffer[offset + 2] << 8) | buffer[offset + 3]; - } + var slice = buffer.Slice(offset); - public static unsafe ulong DecodeUInt64(byte[] buffer, int offset) - { - fixed (byte* ptr = buffer) - { - return DecodeUInt64(ptr, offset); - } + return (slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3]; } - public static unsafe ulong DecodeUInt64(byte* buffer, int offset) + public static unsafe ulong DecodeUInt64(Span buffer, int offset) { - buffer += offset; + var slice = buffer.Slice(offset); - var part1 = (uint)((buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3]); - var part2 = (uint)((buffer[4] << 24) | (buffer[5] << 16) | (buffer[6] << 8) | buffer[7]); + var part1 = (uint)((slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3]); + var part2 = (uint)((slice[4] << 24) | (slice[5] << 16) | (slice[6] << 8) | slice[7]); return ((ulong)part1 << 32) | part2; } - public static unsafe void EncodeUInt16(uint value, byte[] buffer, int offset) - { - fixed (byte* bufferPtr = buffer) - { - EncodeUInt16(value, bufferPtr, offset); - } - } - - public static unsafe void EncodeUInt16(uint value, byte* buffer, int offset) + public static unsafe void EncodeUInt16(uint value, Span buffer, int offset) { - byte* ptr = buffer + offset; + var slice = buffer.Slice(offset); - ptr[0] = (byte)(value >> 8); - ptr[1] = (byte)(value & 255); + slice[0] = (byte)(value >> 8); + slice[1] = (byte)(value & 255); } - public static unsafe void EncodeUInt32(uint value, byte[] buffer, int offset) + public static unsafe void EncodeUInt32(uint value, Span buffer, int offset) { - fixed (byte* bufferPtr = buffer) - { - EncodeUInt32(value, bufferPtr, offset); - } - } - - public static unsafe void EncodeUInt32(uint value, byte* buffer, int offset) - { - byte* ptr = buffer + offset; + var slice = buffer.Slice(offset); - ptr[0] = (byte)(value >> 24); - ptr[1] = (byte)(value >> 16); - ptr[2] = (byte)(value >> 8); - ptr[3] = (byte)(value & 255); + slice[0] = (byte)(value >> 24); + slice[1] = (byte)(value >> 16); + slice[2] = (byte)(value >> 8); + slice[3] = (byte)(value & 255); } - public static unsafe void EncodeUInt64(ulong value, byte[] buffer, int offset) + public static unsafe void EncodeUInt64(ulong value, Span buffer, int offset) { - fixed (byte* bufferPtr = buffer) - { - EncodeUInt64(value, bufferPtr, offset); - } - } + var slice = buffer.Slice(offset); - public static unsafe void EncodeUInt64(ulong value, byte* buffer, int offset) - { - byte* ptr = buffer + offset; - - ptr[0] = (byte)(value >> 56); - ptr[1] = (byte)(value >> 48); - ptr[2] = (byte)(value >> 40); - ptr[3] = (byte)(value >> 32); - ptr[4] = (byte)(value >> 24); - ptr[5] = (byte)(value >> 16); - ptr[6] = (byte)(value >> 8); - ptr[7] = (byte)(value & 255); + slice[0] = (byte)(value >> 56); + slice[1] = (byte)(value >> 48); + slice[2] = (byte)(value >> 40); + slice[3] = (byte)(value >> 32); + slice[4] = (byte)(value >> 24); + slice[5] = (byte)(value >> 16); + slice[6] = (byte)(value >> 8); + slice[7] = (byte)(value & 255); } public static byte[] EncodeKey(string key) diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs index df312d72..95f92872 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; namespace Enyim.Caching.Memcached.Protocol.Binary { @@ -12,15 +13,10 @@ protected internal override IList> GetBuffer() return this.Build().CreateBuffer(); } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) - { - throw new NotImplementedException(); - } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) { throw new System.NotSupportedException(); - } + } } } diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs index bbadc8ca..df575c82 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs @@ -1,6 +1,6 @@ using System; -using System.Text; using System.Diagnostics; +using System.Text; using System.Threading.Tasks; namespace Enyim.Caching.Memcached.Protocol.Binary @@ -64,7 +64,7 @@ public unsafe bool Read(PooledSocket socket) if (dataLength > 0) { var data = new byte[dataLength]; - socket.Read(data, 0, dataLength); + socket.Read(data, 0, dataLength); this.Extra = new ArraySegment(data, 0, extraLength); this.Data = new ArraySegment(data, extraLength, data.Length - extraLength); @@ -207,24 +207,22 @@ private void DoDecodeBody(AsyncIOArgs asyncEvent) if (this.shouldCallNext) this.next(true); } - private unsafe void DeserializeHeader(byte[] header, out int dataLength, out int extraLength) + + private void DeserializeHeader(Span header, out int dataLength, out int extraLength) { - fixed (byte* buffer = header) - { - if (buffer[0] != MAGIC_VALUE) - throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + buffer[0]); + if (header[0] != MAGIC_VALUE) + throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + header[0]); - this.DataType = buffer[HEADER_DATATYPE]; - this.Opcode = buffer[HEADER_OPCODE]; - this.StatusCode = BinaryConverter.DecodeUInt16(buffer, HEADER_STATUS); + this.DataType = header[HEADER_DATATYPE]; + this.Opcode = header[HEADER_OPCODE]; + this.StatusCode = BinaryConverter.DecodeUInt16(header, HEADER_STATUS); - this.KeyLength = BinaryConverter.DecodeUInt16(buffer, HEADER_KEY); - this.CorrelationId = BinaryConverter.DecodeInt32(buffer, HEADER_OPAQUE); - this.CAS = BinaryConverter.DecodeUInt64(buffer, HEADER_CAS); + this.KeyLength = BinaryConverter.DecodeUInt16(header, HEADER_KEY); + this.CorrelationId = BinaryConverter.DecodeInt32(header, HEADER_OPAQUE); + this.CAS = BinaryConverter.DecodeUInt64(header, HEADER_CAS); - dataLength = BinaryConverter.DecodeInt32(buffer, HEADER_BODY); - extraLength = buffer[HEADER_EXTRA]; - } + dataLength = BinaryConverter.DecodeInt32(header, HEADER_BODY); + extraLength = header[HEADER_EXTRA]; } private void LogExecutionTime(string title, DateTime startTime, int thresholdMs) diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinarySingleItemOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinarySingleItemOperation.cs index 5f74d366..5dd6c9a1 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinarySingleItemOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinarySingleItemOperation.cs @@ -44,7 +44,7 @@ protected internal override IOperationResult ReadResponse(PooledSocket socket) return result; } - protected internal override async Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { var response = new BinaryResponse(); var retval = await response.ReadAsync(socket); diff --git a/Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs index 50bd4acf..4c9d5d7e 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs @@ -1,37 +1,54 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Binary { - public class FlushOperation : BinaryOperation, IFlushOperation - { - public FlushOperation() { } - - protected override BinaryRequest Build() - { - var request = new BinaryRequest(OpCode.Flush); - - return request; - } - - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - var response = new BinaryResponse(); - var retval = response.Read(socket); - - this.StatusCode = StatusCode; - var result = new BinaryOperationResult() - { - Success = retval, - StatusCode = this.StatusCode - }; - - result.PassOrFail(retval, "Failed to read response"); - return result; - } - } + public class FlushOperation : BinaryOperation, IFlushOperation + { + public FlushOperation() { } + + protected override BinaryRequest Build() + { + var request = new BinaryRequest(OpCode.Flush); + + return request; + } + + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + var response = new BinaryResponse(); + var retval = response.Read(socket); + + this.StatusCode = StatusCode; + var result = new BinaryOperationResult() + { + Success = retval, + StatusCode = this.StatusCode + }; + + result.PassOrFail(retval, "Failed to read response"); + return result; + } + + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) + { + var response = new BinaryResponse(); + var retval = await response.ReadAsync(socket); + + this.StatusCode = StatusCode; + var result = new BinaryOperationResult() + { + Success = retval, + StatusCode = this.StatusCode + }; + + result.PassOrFail(retval, "Failed to read response"); + return result; + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Binary/GetOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/GetOperation.cs index 6439ce30..7ddad097 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/GetOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/GetOperation.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Text; using Enyim.Caching.Memcached.Results; @@ -37,7 +38,7 @@ protected override IOperationResult ProcessResponse(BinaryResponse response) if (status == 0) { - int flags = BinaryConverter.DecodeInt32(response.Extra, 0); + int flags = BinaryConverter.DecodeInt32(response.Extra.AsSpan(), 0); this.result = new CacheItem((ushort)flags, response.Data); this.Cas = response.CAS; diff --git a/Enyim.Caching/Memcached/Protocol/Binary/MultiGetOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/MultiGetOperation.cs index 51ee95d9..5dd7da08 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/MultiGetOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/MultiGetOperation.cs @@ -1,206 +1,207 @@ using System; -using System.Linq; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Binary { - public class MultiGetOperation : BinaryMultiItemOperation, IMultiGetOperation - { - private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(MultiGetOperation)); + public class MultiGetOperation : BinaryMultiItemOperation, IMultiGetOperation + { + private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(MultiGetOperation)); + + private Dictionary result; + private Dictionary idToKey; + private int noopId; - private Dictionary result; - private Dictionary idToKey; - private int noopId; + public MultiGetOperation(IList keys) : base(keys) { } - public MultiGetOperation(IList keys) : base(keys) { } + protected override BinaryRequest Build(string key) + { + var request = new BinaryRequest(OpCode.GetQ) + { + Key = key + }; - protected override BinaryRequest Build(string key) - { - var request = new BinaryRequest(OpCode.GetQ) - { - Key = key - }; + return request; + } - return request; - } + protected internal override IList> GetBuffer() + { + var keys = this.Keys; - protected internal override IList> GetBuffer() - { - var keys = this.Keys; + if (keys == null || keys.Count == 0) + { + if (log.IsWarnEnabled) log.Warn("Empty multiget!"); - if (keys == null || keys.Count == 0) - { - if (log.IsWarnEnabled) log.Warn("Empty multiget!"); + return new ArraySegment[0]; + } - return new ArraySegment[0]; - } + if (log.IsDebugEnabled) + log.DebugFormat("Building multi-get for {0} keys", keys.Count); - if (log.IsDebugEnabled) - log.DebugFormat("Building multi-get for {0} keys", keys.Count); + // map the command's correlationId to the item key, + // so we can use GetQ (which only returns the item data) + this.idToKey = new Dictionary(); - // map the command's correlationId to the item key, - // so we can use GetQ (which only returns the item data) - this.idToKey = new Dictionary(); + // get ops have 2 segments, header + key + var buffers = new List>(keys.Count * 2); - // get ops have 2 segments, header + key - var buffers = new List>(keys.Count * 2); + foreach (var key in keys) + { + var request = this.Build(key); - foreach (var key in keys) - { - var request = this.Build(key); + request.CreateBuffer(buffers); - request.CreateBuffer(buffers); + // we use this to map the responses to the keys + idToKey[request.CorrelationId] = key; + } - // we use this to map the responses to the keys - idToKey[request.CorrelationId] = key; - } + // uncork the server + var noop = new BinaryRequest(OpCode.NoOp); + this.noopId = noop.CorrelationId; - // uncork the server - var noop = new BinaryRequest(OpCode.NoOp); - this.noopId = noop.CorrelationId; + noop.CreateBuffer(buffers); - noop.CreateBuffer(buffers); + return buffers; + } + + + private PooledSocket currentSocket; + private BinaryResponse asyncReader; + private bool? asyncLoopState; + private Action afterAsyncRead; + + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) + { + return ReadResponse(socket); + } - return buffers; - } + protected internal override bool ReadResponseAsync(PooledSocket socket, Action next) + { + this.result = new Dictionary(); + this.Cas = new Dictionary(); + this.currentSocket = socket; + this.asyncReader = new BinaryResponse(); + this.asyncLoopState = null; + this.afterAsyncRead = next; - private PooledSocket currentSocket; - private BinaryResponse asyncReader; - private bool? asyncLoopState; - private Action afterAsyncRead; + return this.DoReadAsync(); + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + private bool DoReadAsync() { - throw new NotImplementedException(); + bool ioPending; + + var reader = this.asyncReader; + + while (this.asyncLoopState == null) + { + var readSuccess = reader.ReadAsync(this.currentSocket, this.EndReadAsync, out ioPending); + this.StatusCode = reader.StatusCode; + + if (ioPending) return readSuccess; + + if (!readSuccess) + this.asyncLoopState = false; + else if (reader.CorrelationId == this.noopId) + this.asyncLoopState = true; + else + this.StoreResult(reader); + } + + this.afterAsyncRead((bool)this.asyncLoopState); + + return true; } - protected internal override bool ReadResponseAsync(PooledSocket socket, Action next) - { - this.result = new Dictionary(); - this.Cas = new Dictionary(); - - this.currentSocket = socket; - this.asyncReader = new BinaryResponse(); - this.asyncLoopState = null; - this.afterAsyncRead = next; - - return this.DoReadAsync(); - } - - private bool DoReadAsync() - { - bool ioPending; - - var reader = this.asyncReader; - - while (this.asyncLoopState == null) - { - var readSuccess = reader.ReadAsync(this.currentSocket, this.EndReadAsync, out ioPending); - this.StatusCode = reader.StatusCode; - - if (ioPending) return readSuccess; - - if (!readSuccess) - this.asyncLoopState = false; - else if (reader.CorrelationId == this.noopId) - this.asyncLoopState = true; - else - this.StoreResult(reader); - } - - this.afterAsyncRead((bool)this.asyncLoopState); - - return true; - } - - private void EndReadAsync(bool readSuccess) - { - if (!readSuccess) - this.asyncLoopState = false; - else if (this.asyncReader.CorrelationId == this.noopId) - this.asyncLoopState = true; - else - StoreResult(this.asyncReader); - - this.DoReadAsync(); - } - - private void StoreResult(BinaryResponse reader) - { - string key; - - // find the key to the response - if (!this.idToKey.TryGetValue(reader.CorrelationId, out key)) - { - // we're not supposed to get here tho - log.WarnFormat("Found response with CorrelationId {0}, but no key is matching it.", reader.CorrelationId); - } - else - { - if (log.IsDebugEnabled) log.DebugFormat("Reading item {0}", key); - - // deserialize the response - var flags = (ushort)BinaryConverter.DecodeInt32(reader.Extra, 0); - - this.result[key] = new CacheItem(flags, reader.Data); - this.Cas[key] = reader.CAS; - } - } - - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - this.result = new Dictionary(); - this.Cas = new Dictionary(); - var result = new TextOperationResult(); - - var response = new BinaryResponse(); - - while (response.Read(socket)) - { - this.StatusCode = response.StatusCode; - - // found the noop, quit - if (response.CorrelationId == this.noopId) - return result.Pass(); - - string key; - - // find the key to the response - if (!this.idToKey.TryGetValue(response.CorrelationId, out key)) - { - // we're not supposed to get here tho - log.WarnFormat("Found response with CorrelationId {0}, but no key is matching it.", response.CorrelationId); - continue; - } - - if (log.IsDebugEnabled) log.DebugFormat("Reading item {0}", key); - - // deserialize the response - int flags = BinaryConverter.DecodeInt32(response.Extra, 0); - - this.result[key] = new CacheItem((ushort)flags, response.Data); - this.Cas[key] = response.CAS; - } - - // finished reading but we did not find the NOOP - return result.Fail("Found response with CorrelationId {0}, but no key is matching it."); - } - - public Dictionary Result - { - get { return this.result; } - } - - Dictionary IMultiGetOperation.Result - { - get { return this.result; } - } - } + private void EndReadAsync(bool readSuccess) + { + if (!readSuccess) + this.asyncLoopState = false; + else if (this.asyncReader.CorrelationId == this.noopId) + this.asyncLoopState = true; + else + StoreResult(this.asyncReader); + + this.DoReadAsync(); + } + + private void StoreResult(BinaryResponse reader) + { + string key; + + // find the key to the response + if (!this.idToKey.TryGetValue(reader.CorrelationId, out key)) + { + // we're not supposed to get here tho + log.WarnFormat("Found response with CorrelationId {0}, but no key is matching it.", reader.CorrelationId); + } + else + { + if (log.IsDebugEnabled) log.DebugFormat("Reading item {0}", key); + + // deserialize the response + var flags = (ushort)BinaryConverter.DecodeInt32(reader.Extra, 0); + + this.result[key] = new CacheItem(flags, reader.Data); + this.Cas[key] = reader.CAS; + } + } + + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + this.result = new Dictionary(); + this.Cas = new Dictionary(); + var result = new TextOperationResult(); + + var response = new BinaryResponse(); + + while (response.Read(socket)) + { + this.StatusCode = response.StatusCode; + + // found the noop, quit + if (response.CorrelationId == this.noopId) + return result.Pass(); + + string key; + + // find the key to the response + if (!this.idToKey.TryGetValue(response.CorrelationId, out key)) + { + // we're not supposed to get here tho + log.WarnFormat("Found response with CorrelationId {0}, but no key is matching it.", response.CorrelationId); + continue; + } + + if (log.IsDebugEnabled) log.DebugFormat("Reading item {0}", key); + + // deserialize the response + int flags = BinaryConverter.DecodeInt32(response.Extra, 0); + + this.result[key] = new CacheItem((ushort)flags, response.Data); + this.Cas[key] = response.CAS; + } + + // finished reading but we did not find the NOOP + return result.Fail("Found response with CorrelationId {0}, but no key is matching it."); + } + + public Dictionary Result + { + get { return this.result; } + } + + Dictionary IMultiGetOperation.Result + { + get { return this.result; } + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Binary/MutatorOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/MutatorOperation.cs index 0af7d6b3..43d3e6e7 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/MutatorOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/MutatorOperation.cs @@ -1,88 +1,82 @@ using System; +using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; using Enyim.Caching.Memcached.Results.Helpers; -using Enyim.Caching.Memcached.Results; namespace Enyim.Caching.Memcached.Protocol.Binary { - public class MutatorOperation : BinarySingleItemOperation, IMutatorOperation - { - private ulong defaultValue; - private ulong delta; - private uint expires; - private MutationMode mode; - private ulong result; - - public MutatorOperation(MutationMode mode, string key, ulong defaultValue, ulong delta, uint expires) - : base(key) - { - if (delta < 0) throw new ArgumentOutOfRangeException("delta", "delta must be >= 0"); - - this.defaultValue = defaultValue; - this.delta = delta; - this.expires = expires; - this.mode = mode; - } - - protected unsafe void UpdateExtra(BinaryRequest request) - { - byte[] extra = new byte[20]; - - fixed (byte* buffer = extra) - { - BinaryConverter.EncodeUInt64(this.delta, buffer, 0); - - BinaryConverter.EncodeUInt64(this.defaultValue, buffer, 8); - BinaryConverter.EncodeUInt32(this.expires, buffer, 16); - } - - request.Extra = new ArraySegment(extra); - } - - protected override BinaryRequest Build() - { - var request = new BinaryRequest((OpCode)this.mode) - { - Key = this.Key, - Cas = this.Cas - }; - - this.UpdateExtra(request); - - return request; - } - - protected override IOperationResult ProcessResponse(BinaryResponse response) - { - var result = new BinaryOperationResult(); - var status = response.StatusCode; - this.StatusCode = status; - - if (status == 0) - { - var data = response.Data; - if (data.Count != 8) - return result.Fail("Result must be 8 bytes long, received: " + data.Count, new InvalidOperationException()); - - this.result = BinaryConverter.DecodeUInt64(data.Array, data.Offset); - - return result.Pass(); - } - - var message = ResultHelper.ProcessResponseData(response.Data); - return result.Fail(message); - } - - MutationMode IMutatorOperation.Mode - { - get { return this.mode; } - } - - ulong IMutatorOperation.Result - { - get { return this.result; } - } - } + public class MutatorOperation : BinarySingleItemOperation, IMutatorOperation + { + private readonly ulong defaultValue; + private readonly ulong delta; + private readonly uint expires; + private readonly MutationMode mode; + private ulong result; + + public MutatorOperation(MutationMode mode, string key, ulong defaultValue, ulong delta, uint expires) + : base(key) + { + if (delta < 0) throw new ArgumentOutOfRangeException("delta", "delta must be >= 0"); + + this.defaultValue = defaultValue; + this.delta = delta; + this.expires = expires; + this.mode = mode; + } + + protected void UpdateExtra(BinaryRequest request) + { + Span buffer = stackalloc byte[20]; + BinaryConverter.EncodeUInt64(this.delta, buffer, 0); + BinaryConverter.EncodeUInt64(this.defaultValue, buffer, 8); + BinaryConverter.EncodeUInt32(this.expires, buffer, 16); + request.Extra = new ArraySegment(buffer.ToArray()); + } + + protected override BinaryRequest Build() + { + var request = new BinaryRequest((OpCode)this.mode) + { + Key = this.Key, + Cas = this.Cas + }; + + this.UpdateExtra(request); + + return request; + } + + protected override IOperationResult ProcessResponse(BinaryResponse response) + { + var result = new BinaryOperationResult(); + var status = response.StatusCode; + this.StatusCode = status; + + if (status == 0) + { + var data = response.Data; + if (data.Count != 8) + return result.Fail("Result must be 8 bytes long, received: " + data.Count, new InvalidOperationException()); + + this.result = BinaryConverter.DecodeUInt64(data.Array, data.Offset); + + return result.Pass(); + } + + var message = ResultHelper.ProcessResponseData(response.Data); + return result.Fail(message); + } + + MutationMode IMutatorOperation.Mode + { + get { return this.mode; } + } + + ulong IMutatorOperation.Result + { + get { return this.result; } + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Binary/SaslStep.cs b/Enyim.Caching/Memcached/Protocol/Binary/SaslStep.cs index a12c6221..dc294ea0 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/SaslStep.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/SaslStep.cs @@ -1,45 +1,59 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Binary { - public abstract class SaslStep : BinaryOperation - { - protected SaslStep(ISaslAuthenticationProvider provider) - { - this.Provider = provider; - } + public abstract class SaslStep : BinaryOperation + { + protected SaslStep(ISaslAuthenticationProvider provider) + { + this.Provider = provider; + } - protected ISaslAuthenticationProvider Provider { get; private set; } + protected ISaslAuthenticationProvider Provider { get; private set; } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - var response = new BinaryResponse(); + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + var response = new BinaryResponse(); - var retval = response.Read(socket); + var retval = response.Read(socket); - this.StatusCode = response.StatusCode; - this.Data = response.Data.Array; + this.StatusCode = response.StatusCode; + this.Data = response.Data.Array; - var result = new BinaryOperationResult - { - StatusCode = this.StatusCode - }; + var result = new BinaryOperationResult + { + StatusCode = this.StatusCode + }; - result.PassOrFail(retval, "Failed to read response"); - return result; - } + result.PassOrFail(retval, "Failed to read response"); + return result; + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + var response = new BinaryResponse(); + + var retval = await response.ReadAsync(socket); + + this.StatusCode = response.StatusCode; + this.Data = response.Data.Array; + + var result = new BinaryOperationResult + { + StatusCode = this.StatusCode + }; + + result.PassOrFail(retval, "Failed to read response"); + return result; } - public byte[] Data { get; private set; } - } + public byte[] Data { get; private set; } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Binary/StatsOperation.cs b/Enyim.Caching/Memcached/Protocol/Binary/StatsOperation.cs index af4bbd40..264f3cbd 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/StatsOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/StatsOperation.cs @@ -1,72 +1,97 @@ using System; using System.Collections.Generic; -using System.Text; using System.Net; +using System.Text; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Binary { - public class StatsOperation : BinaryOperation, IStatsOperation - { - private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(StatsOperation)); + public class StatsOperation : BinaryOperation, IStatsOperation + { + private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(StatsOperation)); - private string type; - private Dictionary result; + private readonly string type; + private Dictionary result; - public StatsOperation(string type) - { - this.type = type; - } + public StatsOperation(string type) + { + this.type = type; + } - protected override BinaryRequest Build() - { - var request = new BinaryRequest(OpCode.Stat); - if (!String.IsNullOrEmpty(this.type)) - request.Key = this.type; + protected override BinaryRequest Build() + { + var request = new BinaryRequest(OpCode.Stat); + if (!String.IsNullOrEmpty(this.type)) + request.Key = this.type; - return request; - } + return request; + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - var response = new BinaryResponse(); - var serverData = new Dictionary(); - var retval = false; + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + var response = new BinaryResponse(); + var serverData = new Dictionary(); + var retval = false; - while (response.Read(socket) && response.KeyLength > 0) - { - retval = true; + while (response.Read(socket) && response.KeyLength > 0) + { + retval = true; - var data = response.Data; - var key = BinaryConverter.DecodeKey(data.Array, data.Offset, response.KeyLength); - var value = BinaryConverter.DecodeKey(data.Array, data.Offset + response.KeyLength, data.Count - response.KeyLength); + var data = response.Data; + var key = BinaryConverter.DecodeKey(data.Array, data.Offset, response.KeyLength); + var value = BinaryConverter.DecodeKey(data.Array, data.Offset + response.KeyLength, data.Count - response.KeyLength); - serverData[key] = value; - } + serverData[key] = value; + } - this.result = serverData; - this.StatusCode = response.StatusCode; + this.result = serverData; + this.StatusCode = response.StatusCode; - var result = new BinaryOperationResult() - { - StatusCode = StatusCode - }; + var result = new BinaryOperationResult() + { + StatusCode = StatusCode + }; - result.PassOrFail(retval, "Failed to read response"); - return result; - } + result.PassOrFail(retval, "Failed to read response"); + return result; + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + var response = new BinaryResponse(); + var serverData = new Dictionary(); + var retval = false; + + while ((await response.ReadAsync(socket)) && response.KeyLength > 0) + { + retval = true; + + var data = response.Data; + var key = BinaryConverter.DecodeKey(data.Array, data.Offset, response.KeyLength); + var value = BinaryConverter.DecodeKey(data.Array, data.Offset + response.KeyLength, data.Count - response.KeyLength); + + serverData[key] = value; + } + + this.result = serverData; + this.StatusCode = response.StatusCode; + + var result = new BinaryOperationResult() + { + StatusCode = StatusCode + }; + + result.PassOrFail(retval, "Failed to read response"); + return result; } - Dictionary IStatsOperation.Result - { - get { return this.result; } - } - } + Dictionary IStatsOperation.Result + { + get { return this.result; } + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Operation.cs b/Enyim.Caching/Memcached/Protocol/Operation.cs index 9aca9d58..26c144b8 100644 --- a/Enyim.Caching/Memcached/Protocol/Operation.cs +++ b/Enyim.Caching/Memcached/Protocol/Operation.cs @@ -14,7 +14,7 @@ protected Operation() { } internal protected abstract IList> GetBuffer(); internal protected abstract IOperationResult ReadResponse(PooledSocket socket); - internal protected abstract Task ReadResponseAsync(PooledSocket socket); + internal protected abstract ValueTask ReadResponseAsync(PooledSocket socket); internal protected abstract bool ReadResponseAsync(PooledSocket socket, Action next); IList> IOperation.GetBuffer() diff --git a/Enyim.Caching/Memcached/Protocol/Text/DeleteOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/DeleteOperation.cs index a6d926a1..97ea035a 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/DeleteOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/DeleteOperation.cs @@ -1,38 +1,42 @@ using System; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class DeleteOperation : SingleItemOperation, IDeleteOperation - { - internal DeleteOperation(string key) : base(key) { } + public class DeleteOperation : SingleItemOperation, IDeleteOperation + { + internal DeleteOperation(string key) : base(key) { } - protected internal override System.Collections.Generic.IList> GetBuffer() - { - var command = "delete " + this.Key + TextSocketHelper.CommandTerminator; + protected internal override System.Collections.Generic.IList> GetBuffer() + { + var command = "delete " + this.Key + TextSocketHelper.CommandTerminator; - return TextSocketHelper.GetCommandBuffer(command); - } + return TextSocketHelper.GetCommandBuffer(command); + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - return new TextOperationResult - { - Success = String.Compare(TextSocketHelper.ReadResponse(socket), "DELETED", StringComparison.Ordinal) == 0 - }; - } + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + return new TextOperationResult + { + Success = String.Compare(TextSocketHelper.ReadResponse(socket), "DELETED", StringComparison.Ordinal) == 0 + }; + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + return new TextOperationResult + { + Success = String.Compare(TextSocketHelper.ReadResponse(socket), "DELETED", StringComparison.Ordinal) == 0 + }; } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/FlushOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/FlushOperation.cs index 35c39e73..3a15b3af 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/FlushOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/FlushOperation.cs @@ -1,34 +1,36 @@ using System.Collections.Generic; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class FlushOperation : Operation, IFlushOperation - { - public FlushOperation() { } + public class FlushOperation : Operation, IFlushOperation + { + public FlushOperation() { } - protected internal override IList> GetBuffer() - { - return TextSocketHelper.GetCommandBuffer("flush_all" + TextSocketHelper.CommandTerminator); - } + protected internal override IList> GetBuffer() + { + return TextSocketHelper.GetCommandBuffer("flush_all" + TextSocketHelper.CommandTerminator); + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - TextSocketHelper.ReadResponse(socket); - return new TextOperationResult().Pass(); - } + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + TextSocketHelper.ReadResponse(socket); + return new TextOperationResult().Pass(); + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new System.NotImplementedException(); + TextSocketHelper.ReadResponse(socket); + return new TextOperationResult().Pass(); } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/GetOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/GetOperation.cs index 58edfa7d..74413931 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/GetOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/GetOperation.cs @@ -1,51 +1,62 @@ +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class GetOperation : SingleItemOperation, IGetOperation - { - private CacheItem result; + public class GetOperation : SingleItemOperation, IGetOperation + { + private CacheItem result; - internal GetOperation(string key) : base(key) { } + internal GetOperation(string key) : base(key) { } - protected internal override System.Collections.Generic.IList> GetBuffer() - { - var command = "gets " + this.Key + TextSocketHelper.CommandTerminator; + protected internal override System.Collections.Generic.IList> GetBuffer() + { + var command = "gets " + this.Key + TextSocketHelper.CommandTerminator; - return TextSocketHelper.GetCommandBuffer(command); - } + return TextSocketHelper.GetCommandBuffer(command); + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - GetResponse r = GetHelper.ReadItem(socket); - var result = new TextOperationResult(); + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + GetResponse r = GetHelper.ReadItem(socket); + var result = new TextOperationResult(); - if (r == null) return result.Fail("Failed to read response"); + if (r == null) return result.Fail("Failed to read response"); - this.result = r.Item; - this.Cas = r.CasValue; + this.result = r.Item; + this.Cas = r.CasValue; - GetHelper.FinishCurrent(socket); + GetHelper.FinishCurrent(socket); - return result.Pass(); - } + return result.Pass(); + } - CacheItem IGetOperation.Result - { - get { return this.result; } - } + CacheItem IGetOperation.Result + { + get { return this.result; } + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new System.NotImplementedException(); + GetResponse r = GetHelper.ReadItem(socket); + var result = new TextOperationResult(); + + if (r == null) return result.Fail("Failed to read response"); + + this.result = r.Item; + this.Cas = r.CasValue; + + GetHelper.FinishCurrent(socket); + + return result.Pass(); } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/MultiGetOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/MultiGetOperation.cs index 9d7001f0..2fe640ba 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/MultiGetOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/MultiGetOperation.cs @@ -1,76 +1,104 @@ using System; -using System.Linq; using System.Collections.Generic; +using System.Linq; using System.Text; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class MultiGetOperation : MultiItemOperation, IMultiGetOperation - { - private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(MultiGetOperation)); - - private Dictionary result; - - public MultiGetOperation(IList keys) : base(keys) { } + public class MultiGetOperation : MultiItemOperation, IMultiGetOperation + { + private static readonly Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(MultiGetOperation)); - protected internal override IList> GetBuffer() - { - // gets key1 key2 key3 ... keyN\r\n + private Dictionary result; - var command = "gets " + String.Join(" ", Keys.ToArray()) + TextSocketHelper.CommandTerminator; + public MultiGetOperation(IList keys) : base(keys) { } - return TextSocketHelper.GetCommandBuffer(command); - } - - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - var retval = new Dictionary(); - var cas = new Dictionary(); - - try - { - GetResponse r; - - while ((r = GetHelper.ReadItem(socket)) != null) - { - var key = r.Key; + protected internal override IList> GetBuffer() + { + // gets key1 key2 key3 ... keyN\r\n - retval[key] = r.Item; - cas[key] = r.CasValue; - } - } - catch (NotSupportedException) - { - throw; - } - catch (Exception e) - { - log.Error(e); - } + var command = "gets " + String.Join(" ", Keys.ToArray()) + TextSocketHelper.CommandTerminator; - this.result = retval; - this.Cas = cas; + return TextSocketHelper.GetCommandBuffer(command); + } - return new TextOperationResult().Pass(); - } + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + var retval = new Dictionary(); + var cas = new Dictionary(); + + try + { + GetResponse r; + + while ((r = GetHelper.ReadItem(socket)) != null) + { + var key = r.Key; + + retval[key] = r.Item; + cas[key] = r.CasValue; + } + } + catch (NotSupportedException) + { + throw; + } + catch (Exception e) + { + log.Error(e); + } + + this.result = retval; + this.Cas = cas; + + return new TextOperationResult().Pass(); + } - Dictionary IMultiGetOperation.Result - { - get { return this.result; } - } + Dictionary IMultiGetOperation.Result + { + get { return this.result; } + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + var retval = new Dictionary(); + var cas = new Dictionary(); + + try + { + GetResponse r; + + while ((r = GetHelper.ReadItem(socket)) != null) + { + var key = r.Key; + + retval[key] = r.Item; + cas[key] = r.CasValue; + } + } + catch (NotSupportedException) + { + throw; + } + catch (Exception e) + { + log.Error(e); + } + + this.result = retval; + this.Cas = cas; + + return new TextOperationResult().Pass(); } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/MutatorOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/MutatorOperation.cs index a15058f8..d36279e0 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/MutatorOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/MutatorOperation.cs @@ -1,75 +1,85 @@ using System; +using System.Collections.Generic; using System.Globalization; using System.Text; -using System.Collections.Generic; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class MutatorOperation : SingleItemOperation, IMutatorOperation - { - private MutationMode mode; - private ulong delta; - private ulong result; + public class MutatorOperation : SingleItemOperation, IMutatorOperation + { + private readonly MutationMode mode; + private readonly ulong delta; + private ulong result; - internal MutatorOperation(MutationMode mode, string key, ulong delta) - : base(key) - { - this.delta = delta; - this.mode = mode; - } + internal MutatorOperation(MutationMode mode, string key, ulong delta) + : base(key) + { + this.delta = delta; + this.mode = mode; + } - public ulong Result - { - get { return this.result; } - } + public ulong Result + { + get { return this.result; } + } - protected internal override IList> GetBuffer() - { - var command = (this.mode == MutationMode.Increment ? "incr " : "decr ") - + this.Key - + " " - + this.delta.ToString(CultureInfo.InvariantCulture) - + TextSocketHelper.CommandTerminator; + protected internal override IList> GetBuffer() + { + var command = (this.mode == MutationMode.Increment ? "incr " : "decr ") + + this.Key + + " " + + this.delta.ToString(CultureInfo.InvariantCulture) + + TextSocketHelper.CommandTerminator; - return TextSocketHelper.GetCommandBuffer(command); - } + return TextSocketHelper.GetCommandBuffer(command); + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - string response = TextSocketHelper.ReadResponse(socket); - var result = new TextOperationResult(); + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + string response = TextSocketHelper.ReadResponse(socket); + var result = new TextOperationResult(); - //maybe we should throw an exception when the item is not found? - if (String.Compare(response, "NOT_FOUND", StringComparison.Ordinal) == 0) - return result.Fail("Failed to read response. Item not found"); + //maybe we should throw an exception when the item is not found? + if (String.Compare(response, "NOT_FOUND", StringComparison.Ordinal) == 0) + return result.Fail("Failed to read response. Item not found"); - result.Success = - UInt64.TryParse(response, NumberStyles.AllowLeadingWhite | NumberStyles.AllowTrailingWhite, CultureInfo.InvariantCulture, out this.result); - return result; - } + result.Success = + UInt64.TryParse(response, NumberStyles.AllowLeadingWhite | NumberStyles.AllowTrailingWhite, CultureInfo.InvariantCulture, out this.result); + return result; + } - MutationMode IMutatorOperation.Mode - { - get { return this.mode; } - } + MutationMode IMutatorOperation.Mode + { + get { return this.mode; } + } - ulong IMutatorOperation.Result - { - get { return this.result; } - } + ulong IMutatorOperation.Result + { + get { return this.result; } + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + string response = TextSocketHelper.ReadResponse(socket); + var result = new TextOperationResult(); + + //maybe we should throw an exception when the item is not found? + if (String.Compare(response, "NOT_FOUND", StringComparison.Ordinal) == 0) + return result.Fail("Failed to read response. Item not found"); + + result.Success = + UInt64.TryParse(response, NumberStyles.AllowLeadingWhite | NumberStyles.AllowTrailingWhite, CultureInfo.InvariantCulture, out this.result); + return result; } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/StatsOperation.cs b/Enyim.Caching/Memcached/Protocol/Text/StatsOperation.cs index 538765d3..5851d078 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/StatsOperation.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/StatsOperation.cs @@ -1,87 +1,123 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; using Enyim.Caching.Memcached.Results.Extensions; namespace Enyim.Caching.Memcached.Protocol.Text { - public class StatsOperation : Operation, IStatsOperation - { - private static Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(StatsOperation)); + public class StatsOperation : Operation, IStatsOperation + { + private static Enyim.Caching.ILog log = Enyim.Caching.LogManager.GetLogger(typeof(StatsOperation)); - private string type; - private Dictionary result; + private readonly string type; + private Dictionary result; - public StatsOperation(string type) - { - this.type = type; - } + public StatsOperation(string type) + { + this.type = type; + } - protected internal override IList> GetBuffer() - { - var command = String.IsNullOrEmpty(this.type) - ? "stats" + TextSocketHelper.CommandTerminator - : "stats " + this.type + TextSocketHelper.CommandTerminator; + protected internal override IList> GetBuffer() + { + var command = String.IsNullOrEmpty(this.type) + ? "stats" + TextSocketHelper.CommandTerminator + : "stats " + this.type + TextSocketHelper.CommandTerminator; - return TextSocketHelper.GetCommandBuffer(command); - } + return TextSocketHelper.GetCommandBuffer(command); + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - var serverData = new Dictionary(); + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + var serverData = new Dictionary(); - while (true) - { - string line = TextSocketHelper.ReadResponse(socket); + while (true) + { + string line = TextSocketHelper.ReadResponse(socket); - // stat values are terminated by END - if (String.Compare(line, "END", StringComparison.Ordinal) == 0) - break; + // stat values are terminated by END + if (String.Compare(line, "END", StringComparison.Ordinal) == 0) + break; - // expected response is STAT item_name item_value - if (line.Length < 6 || String.Compare(line, 0, "STAT ", 0, 5, StringComparison.Ordinal) != 0) - { - if (log.IsWarnEnabled) - log.Warn("Unknow response: " + line); + // expected response is STAT item_name item_value + if (line.Length < 6 || String.Compare(line, 0, "STAT ", 0, 5, StringComparison.Ordinal) != 0) + { + if (log.IsWarnEnabled) + log.Warn("Unknow response: " + line); - continue; - } + continue; + } - // get the key&value - string[] parts = line.Remove(0, 5).Split(' '); - if (parts.Length != 2) - { - if (log.IsWarnEnabled) - log.Warn("Unknow response: " + line); + // get the key&value + string[] parts = line.Remove(0, 5).Split(' '); + if (parts.Length != 2) + { + if (log.IsWarnEnabled) + log.Warn("Unknow response: " + line); - continue; - } + continue; + } - // store the stat item - serverData[parts[0]] = parts[1]; - } + // store the stat item + serverData[parts[0]] = parts[1]; + } - this.result = serverData; + this.result = serverData; - return new TextOperationResult().Pass(); - } + return new TextOperationResult().Pass(); + } - Dictionary IStatsOperation.Result - { - get { return result; } - } + Dictionary IStatsOperation.Result + { + get { return result; } + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + var serverData = new Dictionary(); + + while (true) + { + string line = TextSocketHelper.ReadResponse(socket); + + // stat values are terminated by END + if (String.Compare(line, "END", StringComparison.Ordinal) == 0) + break; + + // expected response is STAT item_name item_value + if (line.Length < 6 || String.Compare(line, 0, "STAT ", 0, 5, StringComparison.Ordinal) != 0) + { + if (log.IsWarnEnabled) + log.Warn("Unknow response: " + line); + + continue; + } + + // get the key&value + string[] parts = line.Remove(0, 5).Split(' '); + if (parts.Length != 2) + { + if (log.IsWarnEnabled) + log.Warn("Unknow response: " + line); + + continue; + } + + // store the stat item + serverData[parts[0]] = parts[1]; + } + + this.result = serverData; + + return new TextOperationResult().Pass(); } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/Protocol/Text/StoreOperationBase.cs b/Enyim.Caching/Memcached/Protocol/Text/StoreOperationBase.cs index 1b815e96..79d617e2 100644 --- a/Enyim.Caching/Memcached/Protocol/Text/StoreOperationBase.cs +++ b/Enyim.Caching/Memcached/Protocol/Text/StoreOperationBase.cs @@ -1,88 +1,92 @@ using System; +using System.Collections.Generic; using System.Globalization; using System.Text; -using System.Collections.Generic; +using System.Threading.Tasks; using Enyim.Caching.Memcached.Results; namespace Enyim.Caching.Memcached.Protocol.Text { - public class StoreOperationBase : SingleItemOperation - { - private static readonly ArraySegment DataTerminator = new ArraySegment(new byte[2] { (byte)'\r', (byte)'\n' }); - private StoreCommand command; - private CacheItem value; - private uint expires; - private ulong cas; + public class StoreOperationBase : SingleItemOperation + { + private static readonly ArraySegment DataTerminator = new ArraySegment(new byte[2] { (byte)'\r', (byte)'\n' }); + private readonly StoreCommand command; + private CacheItem value; + private readonly uint expires; + private readonly ulong cas; - internal StoreOperationBase(StoreCommand mode, string key, CacheItem value, uint expires, ulong cas) - : base(key) - { - this.command = mode; - this.value = value; - this.expires = expires; - this.cas = cas; - } + internal StoreOperationBase(StoreCommand mode, string key, CacheItem value, uint expires, ulong cas) + : base(key) + { + this.command = mode; + this.value = value; + this.expires = expires; + this.cas = cas; + } - protected internal override System.Collections.Generic.IList> GetBuffer() - { - // todo adjust the size to fit a request using a fnv hashed key - var sb = new StringBuilder(128); - var buffers = new List>(3); + protected internal override System.Collections.Generic.IList> GetBuffer() + { + // todo adjust the size to fit a request using a fnv hashed key + var sb = new StringBuilder(128); + var buffers = new List>(3); - switch (this.command) - { - case StoreCommand.Add: sb.Append("add "); break; - case StoreCommand.Replace: sb.Append("replace "); break; - case StoreCommand.Set: sb.Append("set "); break; - case StoreCommand.Append: sb.Append("append "); break; - case StoreCommand.Prepend: sb.Append("prepend "); break; - case StoreCommand.CheckAndSet: sb.Append("cas "); break; - default: throw new MemcachedClientException(command + " is not supported."); - } + switch (this.command) + { + case StoreCommand.Add: sb.Append("add "); break; + case StoreCommand.Replace: sb.Append("replace "); break; + case StoreCommand.Set: sb.Append("set "); break; + case StoreCommand.Append: sb.Append("append "); break; + case StoreCommand.Prepend: sb.Append("prepend "); break; + case StoreCommand.CheckAndSet: sb.Append("cas "); break; + default: throw new MemcachedClientException(command + " is not supported."); + } - sb.Append(this.Key); - sb.Append(" "); - sb.Append(this.value.Flags.ToString(CultureInfo.InvariantCulture)); - sb.Append(" "); - sb.Append(this.expires.ToString(CultureInfo.InvariantCulture)); - sb.Append(" "); + sb.Append(this.Key); + sb.Append(" "); + sb.Append(this.value.Flags.ToString(CultureInfo.InvariantCulture)); + sb.Append(" "); + sb.Append(this.expires.ToString(CultureInfo.InvariantCulture)); + sb.Append(" "); - var data = this.value.Data; - sb.Append(Convert.ToString(data.Count, CultureInfo.InvariantCulture)); + var data = this.value.Data; + sb.Append(Convert.ToString(data.Count, CultureInfo.InvariantCulture)); - if (command == StoreCommand.CheckAndSet) - { - sb.Append(" "); - sb.Append(Convert.ToString(this.cas, CultureInfo.InvariantCulture)); - } + if (command == StoreCommand.CheckAndSet) + { + sb.Append(" "); + sb.Append(Convert.ToString(this.cas, CultureInfo.InvariantCulture)); + } - sb.Append(TextSocketHelper.CommandTerminator); + sb.Append(TextSocketHelper.CommandTerminator); - TextSocketHelper.GetCommandBuffer(sb.ToString(), buffers); - buffers.Add(data); - buffers.Add(StoreOperationBase.DataTerminator); + TextSocketHelper.GetCommandBuffer(sb.ToString(), buffers); + buffers.Add(data); + buffers.Add(StoreOperationBase.DataTerminator); - return buffers; - } + return buffers; + } - protected internal override IOperationResult ReadResponse(PooledSocket socket) - { - return new TextOperationResult - { - Success = String.Compare(TextSocketHelper.ReadResponse(socket), "STORED", StringComparison.Ordinal) == 0 - }; - } + protected internal override IOperationResult ReadResponse(PooledSocket socket) + { + return new TextOperationResult + { + Success = String.Compare(TextSocketHelper.ReadResponse(socket), "STORED", StringComparison.Ordinal) == 0 + }; + } - protected internal override System.Threading.Tasks.Task ReadResponseAsync(PooledSocket socket) + protected internal override async ValueTask ReadResponseAsync(PooledSocket socket) { - throw new NotImplementedException(); + return new TextOperationResult + { + Success = String.Compare(TextSocketHelper.ReadResponse(socket), "STORED", StringComparison.Ordinal) == 0 + }; } - protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) - { - throw new System.NotSupportedException(); - } - } + protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action next) + { + throw new System.NotSupportedException(); + } + } } #region [ License information ]