Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions Enyim.Caching.Tests/Enyim.Caching.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Enyim.Caching\Enyim.Caching.csproj" />
Expand Down
13 changes: 7 additions & 6 deletions Enyim.Caching/Enyim.Caching.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/cnblogs/EnyimMemcachedCore</RepositoryUrl>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.1" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" />
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.1" />
</ItemGroup>
</Project>
52 changes: 19 additions & 33 deletions Enyim.Caching/Memcached/AsyncPooledSocket.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
//#define DEBUG_IO
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Dawn.Net.Sockets;
using System.Runtime.InteropServices;
using System.Reflection;
using System.Runtime.CompilerServices;

namespace Enyim.Caching.Memcached
{
Expand All @@ -29,7 +28,7 @@ public partial class AsyncPooledSocket : IDisposable
public AsyncPooledSocket(ILogger logger)
{
_logger = logger;
_isAlive = true;
_isAlive = true;
}

private async Task CreateSocketAsync(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout)
Expand Down Expand Up @@ -65,7 +64,7 @@ private async Task CreateSocketAsync(DnsEndPoint endpoint, TimeSpan connectionTi
_socket.NoDelay = true;
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);

_inputStream = new NetworkStream(_socket, ownsSocket: true);
_inputStream = new NetworkStream(_socket, ownsSocket: true);
}

//From https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs
Expand All @@ -78,7 +77,7 @@ public void Initialize(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
var b = new AsyncTaskMethodBuilder();
var ignored = b.Task;
var ignored = b.Task;
Builder = b;
}

Expand All @@ -104,7 +103,7 @@ protected override void OnCompleted(SocketAsyncEventArgs _)
break;
}
}
}
}

public Action<AsyncPooledSocket> CleanupCallback { get; set; }

Expand Down Expand Up @@ -230,12 +229,9 @@ public int ReadByte()

public async Task<byte[]> ReadBytesAsync(int count)
{
using (var awaitable = new SocketAwaitable())
{
awaitable.Buffer = new ArraySegment<byte>(new byte[count], 0, count);
await _socket.ReceiveAsync(awaitable);
return awaitable.Transferred.Array;
}
var buffer = new ArraySegment<byte>(new byte[count], 0, count);
await _socket.ReceiveAsync(buffer, SocketFlags.None);
return buffer.Array;
}

/// <summary>
Expand Down Expand Up @@ -315,24 +311,14 @@ public void Write(IList<ArraySegment<byte>> buffers)

public async Task WriteSync(IList<ArraySegment<byte>> buffers)
{
using (var awaitable = new SocketAwaitable())
try
{
awaitable.Arguments.BufferList = buffers;
try
{
await _socket.SendAsync(awaitable);
}
catch
{
_isAlive = false;
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError);
}

if (awaitable.Arguments.SocketError != SocketError.Success)
{
_isAlive = false;
ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError);
}
await _socket.SendAsync(buffers, SocketFlags.None);
}
catch (Exception ex)
{
_isAlive = false;
_logger.LogError(ex, nameof(PooledSocket.WriteSync));
}
}

Expand Down
46 changes: 25 additions & 21 deletions Enyim.Caching/Memcached/MemcachedNode.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached.Protocol.Binary;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;
using Enyim.Collections;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Threading;
using Enyim.Caching.Configuration;
using Enyim.Collections;
using System.Security;
using Enyim.Caching.Memcached.Protocol.Binary;
using System.Runtime.Serialization;
using System.IO;
using Enyim.Caching.Memcached.Results;
using Enyim.Caching.Memcached.Results.Extensions;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Enyim.Caching.Memcached
{
Expand All @@ -27,13 +27,13 @@ public class MemcachedNode : IMemcachedNode

private bool isDisposed;

private DnsEndPoint endPoint;
private ISocketPoolConfiguration config;
private readonly DnsEndPoint endPoint;
private readonly ISocketPoolConfiguration config;
private InternalPoolImpl internalPoolImpl;
private bool isInitialized;

public MemcachedNode(
DnsEndPoint endpoint,
DnsEndPoint endpoint,
ISocketPoolConfiguration socketPoolConfig,
ILogger logger)
{
Expand Down Expand Up @@ -186,7 +186,7 @@ void IDisposable.Dispose()
private class InternalPoolImpl : IDisposable
{
private readonly ILogger _logger;
private bool _isDebugEnabled;
private readonly bool _isDebugEnabled;

/// <summary>
/// A list of already connected but free to use sockets
Expand All @@ -197,18 +197,18 @@ private class InternalPoolImpl : IDisposable
private bool isAlive;
private DateTime markedAsDeadUtc;

private int minItems;
private int maxItems;
private readonly int minItems;
private readonly int maxItems;

private MemcachedNode ownerNode;
private EndPoint endPoint;
private TimeSpan queueTimeout;
private readonly EndPoint endPoint;
private readonly TimeSpan queueTimeout;
private Semaphore semaphore;

private object initLock = new Object();
private readonly object initLock = new Object();

internal InternalPoolImpl(
MemcachedNode ownerNode,
MemcachedNode ownerNode,
ISocketPoolConfiguration config,
ILogger logger)
{
Expand Down Expand Up @@ -519,9 +519,9 @@ protected internal virtual PooledSocket CreateSocket()
{
return new PooledSocket(this.endPoint, this.config.ConnectionTimeout, this.config.ReceiveTimeout, _logger);
}
catch(Exception ex)
catch (Exception ex)
{
_logger.LogError(new EventId (this.GetHashCode(), nameof(MemcachedNode) ), ex, $"Create {nameof(PooledSocket)}");
_logger.LogError(new EventId(this.GetHashCode(), nameof(MemcachedNode)), ex, $"Create {nameof(PooledSocket)}");
throw;
}
}
Expand Down Expand Up @@ -582,12 +582,16 @@ protected virtual IPooledSocketResult ExecuteOperation(IOperation op)

protected async virtual Task<IPooledSocketResult> ExecuteOperationAsync(IOperation op)
{
_logger.LogDebug($"ExecuteOperationAsync({op})");

var result = this.Acquire();
if (result.Success && result.HasValue)
{
try
{
var pooledSocket = result.Value;


//if Get, call BinaryRequest.CreateBuffer()
var b = op.GetBuffer();

Expand Down
55 changes: 20 additions & 35 deletions Enyim.Caching/Memcached/PooledSocket.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
//#define DEBUG_IO
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Dawn.Net.Sockets;
using System.Runtime.InteropServices;
using System.Reflection;

namespace Enyim.Caching.Memcached
{
Expand All @@ -34,7 +32,7 @@ public PooledSocket(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan r

this.isAlive = true;

var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;

var timeout = connectionTimeout == TimeSpan.MaxValue
Expand All @@ -46,14 +44,14 @@ public PooledSocket(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan r
: (int)receiveTimeout.TotalMilliseconds;

socket.ReceiveTimeout = rcv;
socket.SendTimeout = rcv;
socket.SendTimeout = rcv;

ConnectWithTimeout(socket, endpoint, timeout);

this.socket = socket;
this.endpoint = endpoint;

this.inputStream = new NetworkStream(socket);
this.inputStream = new NetworkStream(socket);
}

private void ConnectWithTimeout(Socket socket, DnsEndPoint endpoint, int timeout)
Expand All @@ -80,13 +78,13 @@ private void ConnectWithTimeout(Socket socket, DnsEndPoint endpoint, int timeout
args.Completed += (s, e) => mres.Set();
if (socket.ConnectAsync(args))
{
if(!mres.Wait(timeout))
if (!mres.Wait(timeout))
{
throw new TimeoutException("Could not connect to " + endpoint);
}
}
}
}
}
}

public Action<PooledSocket> CleanupCallback { get; set; }

Expand Down Expand Up @@ -212,12 +210,9 @@ public int ReadByte()

public async Task<byte[]> ReadBytesAsync(int count)
{
using (var awaitable = new SocketAwaitable())
{
awaitable.Buffer = new ArraySegment<byte>(new byte[count], 0, count);
await this.socket.ReceiveAsync(awaitable);
return awaitable.Transferred.Array;
}
var buffer = new ArraySegment<byte>(new byte[count], 0, count);
await this.socket.ReceiveAsync(buffer, SocketFlags.None);
return buffer.Array;
}

/// <summary>
Expand Down Expand Up @@ -297,24 +292,14 @@ public void Write(IList<ArraySegment<byte>> buffers)

public async Task WriteSync(IList<ArraySegment<byte>> buffers)
{
using (var awaitable = new SocketAwaitable())
try
{
awaitable.Arguments.BufferList = buffers;
try
{
await this.socket.SendAsync(awaitable);
}
catch
{
this.isAlive = false;
ThrowHelper.ThrowSocketWriteError(this.endpoint, awaitable.Arguments.SocketError);
}

if (awaitable.Arguments.SocketError != SocketError.Success)
{
this.isAlive = false;
ThrowHelper.ThrowSocketWriteError(this.endpoint, awaitable.Arguments.SocketError);
}
await socket.SendAsync(buffers, SocketFlags.None);
}
catch (Exception ex)
{
isAlive = false;
_logger.LogError(ex, nameof(PooledSocket.WriteSync));
}
}

Expand Down
Loading