Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Enyim.Caching/Enyim.Caching.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" />
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.1" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
</ItemGroup>
</Project>
15 changes: 7 additions & 8 deletions Enyim.Caching/Memcached/MemcachedNode.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached.Protocol.Binary;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;
using Enyim.Collections;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -13,6 +7,12 @@
using System.Security;
using System.Threading;
using System.Threading.Tasks;
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached.Protocol.Binary;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;
using Enyim.Collections;
using Microsoft.Extensions.Logging;

namespace Enyim.Caching.Memcached
{
Expand Down Expand Up @@ -591,14 +591,13 @@ protected async virtual Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
{
var pooledSocket = result.Value;


//if Get, call BinaryRequest.CreateBuffer()
var b = op.GetBuffer();

await pooledSocket.WriteSync(b);

//if Get, call BinaryResponse
var readResult = op.ReadResponse(pooledSocket);
var readResult = await op.ReadResponseAsync(pooledSocket);
if (readResult.Success)
{
result.Pass();
Expand Down
17 changes: 16 additions & 1 deletion Enyim.Caching/Memcached/PooledSocket.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -11,6 +10,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Enyim.Caching.Memcached
{
Expand Down Expand Up @@ -208,6 +208,21 @@ public int ReadByte()
}
}

public int ReadByteAsync()
{
this.CheckDisposed();

try
{
return this.inputStream.ReadByte();
}
catch (IOException)
{
this.isAlive = false;
throw;
}
}

public async Task<byte[]> ReadBytesAsync(int count)
{
var buffer = new ArraySegment<byte>(new byte[count], 0, count);
Expand Down
108 changes: 28 additions & 80 deletions Enyim.Caching/Memcached/Protocol/Binary/BinaryConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,110 +5,58 @@ namespace Enyim.Caching.Memcached.Protocol.Binary
{
public static class BinaryConverter
{
public static unsafe ushort DecodeUInt16(byte[] buffer, int offset)
public static ushort DecodeUInt16(Span<byte> buffer, int offset)
{
return (ushort)((buffer[offset] << 8) + buffer[offset + 1]);
}

public static unsafe ushort DecodeUInt16(byte* buffer, int offset)
public static int DecodeInt32(Span<byte> buffer, int offset)
{
return (ushort)((buffer[offset] << 8) + buffer[offset + 1]);
}

public static unsafe int DecodeInt32(ArraySegment<byte> segment, int offset)
{
fixed (byte* buffer = segment.Array)
{
byte* ptr = buffer + segment.Offset + offset;

return DecodeInt32(buffer, 0);
}
}

public static unsafe int DecodeInt32(byte* buffer, int offset)
{
buffer += offset;

return (buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3];
}

public static unsafe int DecodeInt32(byte[] buffer, int offset)
{
return (buffer[offset] << 24) | (buffer[offset + 1] << 16) | (buffer[offset + 2] << 8) | buffer[offset + 3];
}
var slice = buffer.Slice(offset);

public static unsafe ulong DecodeUInt64(byte[] buffer, int offset)
{
fixed (byte* ptr = buffer)
{
return DecodeUInt64(ptr, offset);
}
return (slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3];
}

public static unsafe ulong DecodeUInt64(byte* buffer, int offset)
public static unsafe ulong DecodeUInt64(Span<byte> buffer, int offset)
{
buffer += offset;
var slice = buffer.Slice(offset);

var part1 = (uint)((buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3]);
var part2 = (uint)((buffer[4] << 24) | (buffer[5] << 16) | (buffer[6] << 8) | buffer[7]);
var part1 = (uint)((slice[0] << 24) | (slice[1] << 16) | (slice[2] << 8) | slice[3]);
var part2 = (uint)((slice[4] << 24) | (slice[5] << 16) | (slice[6] << 8) | slice[7]);

return ((ulong)part1 << 32) | part2;
}

public static unsafe void EncodeUInt16(uint value, byte[] buffer, int offset)
{
fixed (byte* bufferPtr = buffer)
{
EncodeUInt16(value, bufferPtr, offset);
}
}

public static unsafe void EncodeUInt16(uint value, byte* buffer, int offset)
public static unsafe void EncodeUInt16(uint value, Span<byte> buffer, int offset)
{
byte* ptr = buffer + offset;
var slice = buffer.Slice(offset);

ptr[0] = (byte)(value >> 8);
ptr[1] = (byte)(value & 255);
slice[0] = (byte)(value >> 8);
slice[1] = (byte)(value & 255);
}

public static unsafe void EncodeUInt32(uint value, byte[] buffer, int offset)
public static unsafe void EncodeUInt32(uint value, Span<byte> buffer, int offset)
{
fixed (byte* bufferPtr = buffer)
{
EncodeUInt32(value, bufferPtr, offset);
}
}

public static unsafe void EncodeUInt32(uint value, byte* buffer, int offset)
{
byte* ptr = buffer + offset;
var slice = buffer.Slice(offset);

ptr[0] = (byte)(value >> 24);
ptr[1] = (byte)(value >> 16);
ptr[2] = (byte)(value >> 8);
ptr[3] = (byte)(value & 255);
slice[0] = (byte)(value >> 24);
slice[1] = (byte)(value >> 16);
slice[2] = (byte)(value >> 8);
slice[3] = (byte)(value & 255);
}

public static unsafe void EncodeUInt64(ulong value, byte[] buffer, int offset)
public static unsafe void EncodeUInt64(ulong value, Span<byte> buffer, int offset)
{
fixed (byte* bufferPtr = buffer)
{
EncodeUInt64(value, bufferPtr, offset);
}
}
var slice = buffer.Slice(offset);

public static unsafe void EncodeUInt64(ulong value, byte* buffer, int offset)
{
byte* ptr = buffer + offset;

ptr[0] = (byte)(value >> 56);
ptr[1] = (byte)(value >> 48);
ptr[2] = (byte)(value >> 40);
ptr[3] = (byte)(value >> 32);
ptr[4] = (byte)(value >> 24);
ptr[5] = (byte)(value >> 16);
ptr[6] = (byte)(value >> 8);
ptr[7] = (byte)(value & 255);
slice[0] = (byte)(value >> 56);
slice[1] = (byte)(value >> 48);
slice[2] = (byte)(value >> 40);
slice[3] = (byte)(value >> 32);
slice[4] = (byte)(value >> 24);
slice[5] = (byte)(value >> 16);
slice[6] = (byte)(value >> 8);
slice[7] = (byte)(value & 255);
}

public static byte[] EncodeKey(string key)
Expand Down
8 changes: 2 additions & 6 deletions Enyim.Caching/Memcached/Protocol/Binary/BinaryOperation.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Enyim.Caching.Memcached.Protocol.Binary
{
Expand All @@ -12,15 +13,10 @@ protected internal override IList<ArraySegment<byte>> GetBuffer()
return this.Build().CreateBuffer();
}

protected internal override System.Threading.Tasks.Task<Results.IOperationResult> ReadResponseAsync(PooledSocket socket)
{
throw new NotImplementedException();
}

protected internal override bool ReadResponseAsync(PooledSocket socket, System.Action<bool> next)
{
throw new System.NotSupportedException();
}
}
}
}

Expand Down
30 changes: 14 additions & 16 deletions Enyim.Caching/Memcached/Protocol/Binary/BinaryResponse.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.Text;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace Enyim.Caching.Memcached.Protocol.Binary
Expand Down Expand Up @@ -64,7 +64,7 @@ public unsafe bool Read(PooledSocket socket)
if (dataLength > 0)
{
var data = new byte[dataLength];
socket.Read(data, 0, dataLength);
socket.Read(data, 0, dataLength);

this.Extra = new ArraySegment<byte>(data, 0, extraLength);
this.Data = new ArraySegment<byte>(data, extraLength, data.Length - extraLength);
Expand Down Expand Up @@ -207,24 +207,22 @@ private void DoDecodeBody(AsyncIOArgs asyncEvent)
if (this.shouldCallNext) this.next(true);
}

private unsafe void DeserializeHeader(byte[] header, out int dataLength, out int extraLength)

private void DeserializeHeader(Span<byte> header, out int dataLength, out int extraLength)
{
fixed (byte* buffer = header)
{
if (buffer[0] != MAGIC_VALUE)
throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + buffer[0]);
if (header[0] != MAGIC_VALUE)
throw new InvalidOperationException("Expected magic value " + MAGIC_VALUE + ", received: " + header[0]);

this.DataType = buffer[HEADER_DATATYPE];
this.Opcode = buffer[HEADER_OPCODE];
this.StatusCode = BinaryConverter.DecodeUInt16(buffer, HEADER_STATUS);
this.DataType = header[HEADER_DATATYPE];
this.Opcode = header[HEADER_OPCODE];
this.StatusCode = BinaryConverter.DecodeUInt16(header, HEADER_STATUS);

this.KeyLength = BinaryConverter.DecodeUInt16(buffer, HEADER_KEY);
this.CorrelationId = BinaryConverter.DecodeInt32(buffer, HEADER_OPAQUE);
this.CAS = BinaryConverter.DecodeUInt64(buffer, HEADER_CAS);
this.KeyLength = BinaryConverter.DecodeUInt16(header, HEADER_KEY);
this.CorrelationId = BinaryConverter.DecodeInt32(header, HEADER_OPAQUE);
this.CAS = BinaryConverter.DecodeUInt64(header, HEADER_CAS);

dataLength = BinaryConverter.DecodeInt32(buffer, HEADER_BODY);
extraLength = buffer[HEADER_EXTRA];
}
dataLength = BinaryConverter.DecodeInt32(header, HEADER_BODY);
extraLength = header[HEADER_EXTRA];
}

private void LogExecutionTime(string title, DateTime startTime, int thresholdMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected internal override IOperationResult ReadResponse(PooledSocket socket)
return result;
}

protected internal override async Task<IOperationResult> ReadResponseAsync(PooledSocket socket)
protected internal override async ValueTask<IOperationResult> ReadResponseAsync(PooledSocket socket)
{
var response = new BinaryResponse();
var retval = await response.ReadAsync(socket);
Expand Down
71 changes: 44 additions & 27 deletions Enyim.Caching/Memcached/Protocol/Binary/FlushOperation.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,54 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;

namespace Enyim.Caching.Memcached.Protocol.Binary
{
public class FlushOperation : BinaryOperation, IFlushOperation
{
public FlushOperation() { }

protected override BinaryRequest Build()
{
var request = new BinaryRequest(OpCode.Flush);

return request;
}

protected internal override IOperationResult ReadResponse(PooledSocket socket)
{
var response = new BinaryResponse();
var retval = response.Read(socket);

this.StatusCode = StatusCode;
var result = new BinaryOperationResult()
{
Success = retval,
StatusCode = this.StatusCode
};

result.PassOrFail(retval, "Failed to read response");
return result;
}
}
public class FlushOperation : BinaryOperation, IFlushOperation
{
public FlushOperation() { }

protected override BinaryRequest Build()
{
var request = new BinaryRequest(OpCode.Flush);

return request;
}

protected internal override IOperationResult ReadResponse(PooledSocket socket)
{
var response = new BinaryResponse();
var retval = response.Read(socket);

this.StatusCode = StatusCode;
var result = new BinaryOperationResult()
{
Success = retval,
StatusCode = this.StatusCode
};

result.PassOrFail(retval, "Failed to read response");
return result;
}

protected internal override async ValueTask<IOperationResult> ReadResponseAsync(PooledSocket socket)
{
var response = new BinaryResponse();
var retval = await response.ReadAsync(socket);

this.StatusCode = StatusCode;
var result = new BinaryOperationResult()
{
Success = retval,
StatusCode = this.StatusCode
};

result.PassOrFail(retval, "Failed to read response");
return result;
}
}
}

#region [ License information ]
Expand Down
Loading