diff --git a/Enyim.Caching/Configuration/ConfigurationHelper.cs b/Enyim.Caching/Configuration/ConfigurationHelper.cs index 2c09b641..f3aafdde 100755 --- a/Enyim.Caching/Configuration/ConfigurationHelper.cs +++ b/Enyim.Caching/Configuration/ConfigurationHelper.cs @@ -73,7 +73,7 @@ public static void CheckForInterface(Type type, Type interfaceType) // throw new System.Configuration.ConfigurationErrorsException("The type " + type.AssemblyQualifiedName + " must implement " + interfaceType.AssemblyQualifiedName); } - public static EndPoint ResolveToEndPoint(string value) + public static DnsEndPoint ResolveToEndPoint(string value) { if (String.IsNullOrEmpty(value)) throw new ArgumentNullException("value"); @@ -86,25 +86,8 @@ public static EndPoint ResolveToEndPoint(string value) if (!Int32.TryParse(parts[1], out port)) throw new ArgumentException("Cannot parse port: " + parts[1], "value"); - return ResolveToEndPoint(parts[0], port); - } - - public static EndPoint ResolveToEndPoint(string host, int port) - { - if (String.IsNullOrEmpty(host)) - throw new ArgumentNullException("host"); - - IPAddress address; - // parse as an IP address - if (!IPAddress.TryParse(host, out address)) - { - var addresses = Dns.GetHostAddresses(host); - address = addresses.FirstOrDefault(ip => ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork); - if (address == null) - throw new ArgumentException(String.Format("Could not resolve host '{0}'.", host)); - } - return new IPEndPoint(address, port); - } + return new DnsEndPoint(parts[0], port); + } } } diff --git a/Enyim.Caching/Configuration/IMemcachedClientConfiguration.cs b/Enyim.Caching/Configuration/IMemcachedClientConfiguration.cs index c16d25bd..397bc676 100755 --- a/Enyim.Caching/Configuration/IMemcachedClientConfiguration.cs +++ b/Enyim.Caching/Configuration/IMemcachedClientConfiguration.cs @@ -13,7 +13,7 @@ public interface IMemcachedClientConfiguration /// /// Gets a list of each representing a Memcached server in the pool. /// - IList Servers { get; } + IList Servers { get; } /// /// Gets the configuration of the socket pool. diff --git a/Enyim.Caching/Configuration/MemcachedClientConfiguration.cs b/Enyim.Caching/Configuration/MemcachedClientConfiguration.cs index f4add1d3..bc7ee2ee 100755 --- a/Enyim.Caching/Configuration/MemcachedClientConfiguration.cs +++ b/Enyim.Caching/Configuration/MemcachedClientConfiguration.cs @@ -41,21 +41,22 @@ public MemcachedClientConfiguration( var options = optionsAccessor.Value; if ((options == null || options.Servers.Count == 0) && configuration != null) { - configuration.GetSection("enyimMemcached").Bind(options); - } - - Servers = new List(); - foreach (var server in options.Servers) - { - IPAddress address; - if (IPAddress.TryParse(server.Address, out address)) + var section = configuration.GetSection("enyimMemcached"); + if (section.Exists()) { - Servers.Add(new IPEndPoint(address, server.Port)); + section.Bind(options); } else { - Servers.Add(new DnsEndPoint(server.Address, server.Port)); - } + _logger.LogWarning($"No enyimMemcached setting in appsetting.json. Use default configuration"); + options.AddDefaultServer(); + } + } + + Servers = new List(); + foreach (var server in options.Servers) + { + Servers.Add(new DnsEndPoint(server.Address, server.Port)); } SocketPool = new SocketPoolConfiguration(); @@ -187,13 +188,13 @@ public void AddServer(string address) /// The port number of the memcached instance. public void AddServer(string host, int port) { - this.Servers.Add(ConfigurationHelper.ResolveToEndPoint(host, port)); + this.Servers.Add(new DnsEndPoint(host, port)); } /// /// Gets a list of each representing a Memcached server in the pool. /// - public IList Servers { get; private set; } + public IList Servers { get; private set; } /// /// Gets the configuration of the socket pool. @@ -250,7 +251,7 @@ public ITranscoder Transcoder #region [ interface ] - IList IMemcachedClientConfiguration.Servers + IList IMemcachedClientConfiguration.Servers { get { return this.Servers; } } diff --git a/Enyim.Caching/Configuration/MemcachedClientOptions.cs b/Enyim.Caching/Configuration/MemcachedClientOptions.cs index 59f64f39..7d25d5b9 100644 --- a/Enyim.Caching/Configuration/MemcachedClientOptions.cs +++ b/Enyim.Caching/Configuration/MemcachedClientOptions.cs @@ -30,6 +30,8 @@ public void AddServer(string address, int port) Servers.Add(new Server { Address = address, Port = port }); } + public void AddDefaultServer() => AddServer("memcached", 11211); + public void AddPlainTextAuthenticator(string zone, string userName, string password) { Authentication = new Authentication diff --git a/Enyim.Caching/Enyim.Caching.csproj b/Enyim.Caching/Enyim.Caching.csproj index 12e20edd..4fd77f69 100755 --- a/Enyim.Caching/Enyim.Caching.csproj +++ b/Enyim.Caching/Enyim.Caching.csproj @@ -2,7 +2,7 @@ EnyimMemcachedCore is a Memcached client library for .NET Core. Usage: Add services.AddEnyimMemcached(...) and app.UseEnyimMemcached() in Startup. Add IMemcachedClient into constructor. - 2.1.6 + 2.1.7 cnblogs.com netstandard2.0 true diff --git a/Enyim.Caching/EnyimMemcachedServiceCollectionExtensions.cs b/Enyim.Caching/EnyimMemcachedServiceCollectionExtensions.cs index a2c6a290..c83fab53 100644 --- a/Enyim.Caching/EnyimMemcachedServiceCollectionExtensions.cs +++ b/Enyim.Caching/EnyimMemcachedServiceCollectionExtensions.cs @@ -49,6 +49,11 @@ public static IServiceCollection AddEnyimMemcached(this IServiceCollection servi throw new ArgumentNullException(nameof(configurationSection)); } + if(!configurationSection.Exists()) + { + throw new ArgumentNullException($"{configurationSection.Key} in appsettings.json"); + } + return AddEnyimMemcachedInternal(services, s => s.Configure(configurationSection)); } @@ -64,7 +69,13 @@ public static IServiceCollection AddEnyimMemcached(this IServiceCollection servi throw new ArgumentNullException(nameof(configuration)); } - return AddEnyimMemcachedInternal(services, s => s.Configure(configuration.GetSection(sectionKey))); + var section = configuration.GetSection(sectionKey); + if (!section.Exists()) + { + throw new ArgumentNullException($"{sectionKey} in appsettings.json"); + } + + return AddEnyimMemcachedInternal(services, s => s.Configure(section)); } private static IServiceCollection AddEnyimMemcachedInternal(IServiceCollection services, Action configure) @@ -81,6 +92,6 @@ private static IServiceCollection AddEnyimMemcachedInternal(IServiceCollection s services.AddSingleton(factory => factory.GetService()); return services; - } + } } } diff --git a/Enyim.Caching/Memcached/AsyncPooledSocket.cs b/Enyim.Caching/Memcached/AsyncPooledSocket.cs new file mode 100644 index 00000000..86e70962 --- /dev/null +++ b/Enyim.Caching/Memcached/AsyncPooledSocket.cs @@ -0,0 +1,381 @@ +//#define DEBUG_IO +using System; +using System.Linq; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Dawn.Net.Sockets; +using System.Runtime.InteropServices; +using System.Reflection; +using System.Runtime.CompilerServices; + +namespace Enyim.Caching.Memcached +{ + [DebuggerDisplay("[ Address: {endpoint}, IsAlive = {IsAlive} ]")] + public partial class AsyncPooledSocket : IDisposable + { + private readonly ILogger _logger; + private bool _isAlive; + private Socket _socket; + private Stream _inputStream; + private AsyncSocketHelper _helper; + + public AsyncPooledSocket(ILogger logger) + { + _logger = logger; + _isAlive = true; + } + + private async Task CreateSocketAsync(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout) + { + CancellationTokenSource cancellationConnTimeout = null; + if (connectionTimeout != TimeSpan.MaxValue) + { + cancellationConnTimeout = new CancellationTokenSource(connectionTimeout); + } + + var args = new ConnectEventArgs(); + args.RemoteEndPoint = endpoint; + + if (Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, args)) + { + if (cancellationConnTimeout != null) + { + using (cancellationConnTimeout.Token.Register(s => Socket.CancelConnectAsync((SocketAsyncEventArgs)s), args)) + { + await args.Builder.Task.ConfigureAwait(false); + } + } + } + else if (args.SocketError != SocketError.Success) + { + throw new SocketException((int)args.SocketError); + } + + _socket = args.ConnectSocket; + _socket.ReceiveTimeout = receiveTimeout == TimeSpan.MaxValue + ? Timeout.Infinite + : (int)receiveTimeout.TotalMilliseconds; + _socket.NoDelay = true; + _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + + _inputStream = new NetworkStream(_socket, ownsSocket: true); + } + + //From https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs + private sealed class ConnectEventArgs : SocketAsyncEventArgs + { + public AsyncTaskMethodBuilder Builder { get; private set; } + public CancellationToken CancellationToken { get; private set; } + + public void Initialize(CancellationToken cancellationToken) + { + CancellationToken = cancellationToken; + var b = new AsyncTaskMethodBuilder(); + var ignored = b.Task; + Builder = b; + } + + protected override void OnCompleted(SocketAsyncEventArgs _) + { + switch (SocketError) + { + case SocketError.Success: + Builder.SetResult(); + break; + + case SocketError.OperationAborted: + case SocketError.ConnectionAborted: + if (CancellationToken.IsCancellationRequested) + { + Builder.SetException(new TaskCanceledException()); + break; + } + goto default; + + default: + Builder.SetException(new SocketException((int)SocketError)); + break; + } + } + } + + public Action CleanupCallback { get; set; } + + public int Available + { + get { return _socket.Available; } + } + + public void Reset() + { + // discard any buffered data + _inputStream.Flush(); + + if (_helper != null) _helper.DiscardBuffer(); + + int available = _socket.Available; + + if (available > 0) + { + if (_logger.IsEnabled(LogLevel.Warning)) + _logger.LogWarning("Socket bound to {0} has {1} unread data! This is probably a bug in the code. InstanceID was {2}.", _socket.RemoteEndPoint, available, InstanceId); + + byte[] data = new byte[available]; + + Read(data, 0, available); + + if (_logger.IsEnabled(LogLevel.Warning)) + _logger.LogWarning(Encoding.ASCII.GetString(data)); + } + + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("Socket {0} was reset", InstanceId); + } + + /// + /// The ID of this instance. Used by the to identify the instance in its inner lists. + /// + public readonly Guid InstanceId = Guid.NewGuid(); + + public bool IsAlive + { + get { return _isAlive; } + } + + /// + /// Releases all resources used by this instance and shuts down the inner . This instance will not be usable anymore. + /// + /// Use the IDisposable.Dispose method if you want to release this instance back into the pool. + public void Destroy() + { + Dispose(true); + } + + ~AsyncPooledSocket() + { + try { Dispose(true); } + catch { } + } + + protected void Dispose(bool disposing) + { + if (disposing) + { + GC.SuppressFinalize(this); + + try + { + if (_socket != null) + try { _socket.Dispose(); } + catch { } + + if (_inputStream != null) + _inputStream.Dispose(); + + _inputStream = null; + _socket = null; + CleanupCallback = null; + } + catch (Exception e) + { + _logger.LogError(nameof(PooledSocket), e); + } + } + else + { + Action cc = CleanupCallback; + + if (cc != null) + cc(this); + } + } + + void IDisposable.Dispose() + { + Dispose(false); + } + + private void CheckDisposed() + { + if (_socket == null) + throw new ObjectDisposedException("PooledSocket"); + } + + /// + /// Reads the next byte from the server's response. + /// + /// This method blocks and will not return until the value is read. + public int ReadByte() + { + CheckDisposed(); + + try + { + return _inputStream.ReadByte(); + } + catch (IOException) + { + _isAlive = false; + + throw; + } + } + + public async Task ReadBytesAsync(int count) + { + using (var awaitable = new SocketAwaitable()) + { + awaitable.Buffer = new ArraySegment(new byte[count], 0, count); + await _socket.ReceiveAsync(awaitable); + return awaitable.Transferred.Array; + } + } + + /// + /// Reads data from the server into the specified buffer. + /// + /// An array of that is the storage location for the received data. + /// The location in buffer to store the received data. + /// The number of bytes to read. + /// This method blocks and will not return until the specified amount of bytes are read. + public void Read(byte[] buffer, int offset, int count) + { + CheckDisposed(); + + int read = 0; + int shouldRead = count; + + while (read < count) + { + try + { + int currentRead = _inputStream.Read(buffer, offset, shouldRead); + if (currentRead < 1) + continue; + + read += currentRead; + offset += currentRead; + shouldRead -= currentRead; + } + catch (IOException) + { + _isAlive = false; + throw; + } + } + } + + public void Write(byte[] data, int offset, int length) + { + CheckDisposed(); + + SocketError status; + + _socket.Send(data, offset, length, SocketFlags.None, out status); + + if (status != SocketError.Success) + { + _isAlive = false; + + ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, status); + } + } + + public void Write(IList> buffers) + { + CheckDisposed(); + + SocketError status; + +#if DEBUG + int total = 0; + for (int i = 0, C = buffers.Count; i < C; i++) + total += buffers[i].Count; + + if (_socket.Send(buffers, SocketFlags.None, out status) != total) + System.Diagnostics.Debugger.Break(); +#else + _socket.Send(buffers, SocketFlags.None, out status); +#endif + + if (status != SocketError.Success) + { + _isAlive = false; + + ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, status); + } + } + + public async Task WriteSync(IList> buffers) + { + using (var awaitable = new SocketAwaitable()) + { + awaitable.Arguments.BufferList = buffers; + try + { + await _socket.SendAsync(awaitable); + } + catch + { + _isAlive = false; + ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError); + } + + if (awaitable.Arguments.SocketError != SocketError.Success) + { + _isAlive = false; + ThrowHelper.ThrowSocketWriteError(_socket.RemoteEndPoint, awaitable.Arguments.SocketError); + } + } + } + + /// + /// Receives data asynchronously. Returns true if the IO is pending. Returns false if the socket already failed or the data was available in the buffer. + /// p.Next will only be called if the call completes asynchronously. + /// + public bool ReceiveAsync(AsyncIOArgs p) + { + CheckDisposed(); + + if (!_isAlive) + { + p.Fail = true; + p.Result = null; + + return false; + } + + if (_helper == null) + _helper = new AsyncSocketHelper(this); + + return _helper.Read(p); + } + } +} + +#region [ License information ] +/* ************************************************************ + * + * Copyright (c) 2010 Attila Kisk? enyim.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ************************************************************/ +#endregion diff --git a/Enyim.Caching/Memcached/AsyncSocketHelper.cs b/Enyim.Caching/Memcached/AsyncSocketHelper.cs index 51f1d15e..3c4c42fd 100644 --- a/Enyim.Caching/Memcached/AsyncSocketHelper.cs +++ b/Enyim.Caching/Memcached/AsyncSocketHelper.cs @@ -11,7 +11,7 @@ namespace Enyim.Caching.Memcached { - public partial class PooledSocket + public partial class AsyncPooledSocket { /// /// Supports exactly one reader and writer, but they can do IO concurrently @@ -20,7 +20,7 @@ private class AsyncSocketHelper { private const int ChunkSize = 65536; - private PooledSocket socket; + private AsyncPooledSocket socket; private SlidingBuffer asyncBuffer; private SocketAsyncEventArgs readEvent; @@ -34,7 +34,7 @@ private class AsyncSocketHelper private int isAborted; private ManualResetEvent readInProgressEvent; - public AsyncSocketHelper(PooledSocket socket) + public AsyncSocketHelper(AsyncPooledSocket socket) { this.socket = socket; this.asyncBuffer = new SlidingBuffer(ChunkSize); @@ -93,12 +93,12 @@ private void BeginReceive() { this.readInProgressEvent.Reset(); - if (this.socket.socket.ReceiveAsync(this.readEvent)) + if (this.socket._socket.ReceiveAsync(this.readEvent)) { // wait until the timeout elapses, then abort this reading process // EndREceive will be triggered sooner or later but its timeout // may be higher than our read timeout, so it's not reliable - if (!readInProgressEvent.WaitOne(this.socket.socket.ReceiveTimeout)) + if (!readInProgressEvent.WaitOne(this.socket._socket.ReceiveTimeout)) this.AbortReadAndTryPublishError(false); return; @@ -117,7 +117,7 @@ void AsyncReadCompleted(object sender, SocketAsyncEventArgs e) private void AbortReadAndTryPublishError(bool markAsDead) { if (markAsDead) - this.socket.isAlive = false; + this.socket._isAlive = false; // we've been already aborted, so quit // both the EndReceive and the wait on the event can abort the read @@ -186,6 +186,182 @@ private void PublishResult(bool isAsync) } } } + + public partial class PooledSocket + { + /// + /// Supports exactly one reader and writer, but they can do IO concurrently + /// + private class AsyncSocketHelper + { + private const int ChunkSize = 65536; + + private PooledSocket socket; + private SlidingBuffer asyncBuffer; + + private SocketAsyncEventArgs readEvent; +#if DEBUG_IO + private int doingIO; +#endif + private int remainingRead; + private int expectedToRead; + private AsyncIOArgs pendingArgs; + + private int isAborted; + private ManualResetEvent readInProgressEvent; + + public AsyncSocketHelper(PooledSocket socket) + { + this.socket = socket; + this.asyncBuffer = new SlidingBuffer(ChunkSize); + + this.readEvent = new SocketAsyncEventArgs(); + this.readEvent.Completed += new EventHandler(AsyncReadCompleted); + this.readEvent.SetBuffer(new byte[ChunkSize], 0, ChunkSize); + + this.readInProgressEvent = new ManualResetEvent(false); + } + + /// + /// returns true if io is pending + /// + /// + /// + public bool Read(AsyncIOArgs p) + { + var count = p.Count; + if (count < 1) throw new ArgumentOutOfRangeException("count", "count must be > 0"); +#if DEBUG_IO + if (Interlocked.CompareExchange(ref this.doingIO, 1, 0) != 0) + throw new InvalidOperationException("Receive is already in progress"); +#endif + this.expectedToRead = p.Count; + this.pendingArgs = p; + + p.Fail = false; + p.Result = null; + + if (this.asyncBuffer.Available >= count) + { + PublishResult(false); + + return false; + } + else + { + this.remainingRead = count - this.asyncBuffer.Available; + this.isAborted = 0; + + this.BeginReceive(); + + return true; + } + } + + public void DiscardBuffer() + { + this.asyncBuffer.UnsafeClear(); + } + + private void BeginReceive() + { + while (this.remainingRead > 0) + { + this.readInProgressEvent.Reset(); + + if (this.socket.socket.ReceiveAsync(this.readEvent)) + { + // wait until the timeout elapses, then abort this reading process + // EndREceive will be triggered sooner or later but its timeout + // may be higher than our read timeout, so it's not reliable + if (!readInProgressEvent.WaitOne(this.socket.socket.ReceiveTimeout)) + this.AbortReadAndTryPublishError(false); + + return; + } + + this.EndReceive(); + } + } + + void AsyncReadCompleted(object sender, SocketAsyncEventArgs e) + { + if (this.EndReceive()) + this.BeginReceive(); + } + + private void AbortReadAndTryPublishError(bool markAsDead) + { + if (markAsDead) + this.socket.isAlive = false; + + // we've been already aborted, so quit + // both the EndReceive and the wait on the event can abort the read + // but only one should of them should continue the async call chain + if (Interlocked.CompareExchange(ref this.isAborted, 1, 0) != 0) + return; + + this.remainingRead = 0; + var p = this.pendingArgs; +#if DEBUG_IO + Thread.MemoryBarrier(); + + this.doingIO = 0; +#endif + + p.Fail = true; + p.Result = null; + + this.pendingArgs.Next(p); + } + + /// + /// returns true when io is pending + /// + /// + private bool EndReceive() + { + this.readInProgressEvent.Set(); + + var read = this.readEvent.BytesTransferred; + if (this.readEvent.SocketError != SocketError.Success + || read == 0) + { + this.AbortReadAndTryPublishError(true);//new IOException("Remote end has been closed")); + + return false; + } + + this.remainingRead -= read; + this.asyncBuffer.Append(this.readEvent.Buffer, 0, read); + + if (this.remainingRead <= 0) + { + this.PublishResult(true); + + return false; + } + + return true; + } + + private void PublishResult(bool isAsync) + { + var retval = this.pendingArgs; + + var data = new byte[this.expectedToRead]; + this.asyncBuffer.Read(data, 0, retval.Count); + pendingArgs.Result = data; +#if DEBUG_IO + Thread.MemoryBarrier(); + this.doingIO = 0; +#endif + + if (isAsync) + pendingArgs.Next(pendingArgs); + } + } + } } #region [ License information ] diff --git a/Enyim.Caching/Memcached/BasicNetworkStream.cs b/Enyim.Caching/Memcached/BasicNetworkStream.cs deleted file mode 100644 index 5877069a..00000000 --- a/Enyim.Caching/Memcached/BasicNetworkStream.cs +++ /dev/null @@ -1,134 +0,0 @@ -//#define DEBUG_IO -using System; -using System.Linq; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Threading; - -namespace Enyim.Caching.Memcached -{ - public partial class PooledSocket - { - #region [ BasicNetworkStream ] - - private class BasicNetworkStream : Stream - { - private Socket socket; - - public BasicNetworkStream(Socket socket) - { - this.socket = socket; - } - - public override bool CanRead - { - get { return true; } - } - - public override bool CanSeek - { - get { return false; } - } - - public override bool CanWrite - { - get { return false; } - } - - public override void Flush() - { - } - - public override long Length - { - get { throw new NotSupportedException(); } - } - - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } - - - //public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - //{ - // SocketError errorCode; - - // var retval = this.socket.BeginReceive(buffer, offset, count, SocketFlags.None, out errorCode, callback, state); - - // if (errorCode == SocketError.Success) - // return retval; - - // throw new System.IO.IOException(String.Format("Failed to read from the socket '{0}'. Error: {1}", this.socket.RemoteEndPoint, errorCode)); - //} - - //public override int EndRead(IAsyncResult asyncResult) - //{ - // SocketError errorCode; - - // var retval = this.socket.EndReceive(asyncResult, out errorCode); - - // // actually "0 bytes read" could mean an error as well - // if (errorCode == SocketError.Success && retval > 0) - // return retval; - - // throw new System.IO.IOException(String.Format("Failed to read from the socket '{0}'. Error: {1}", this.socket.RemoteEndPoint, errorCode)); - //} - - public override int Read(byte[] buffer, int offset, int count) - { - SocketError errorCode; - - int retval = this.socket.Receive(buffer, offset, count, SocketFlags.None, out errorCode); - - // actually "0 bytes read" could mean an error as well - if (errorCode == SocketError.Success && retval > 0) - return retval; - - throw new System.IO.IOException(String.Format("Failed to read from the socket '{0}'. Error: {1}", this.socket.RemoteEndPoint, errorCode == SocketError.Success ? "?" : errorCode.ToString())); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } - } - - #endregion - } -} - -#region [ License information ] -/* ************************************************************ - * - * Copyright (c) 2010 Attila Kisk? enyim.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ************************************************************/ -#endregion diff --git a/Enyim.Caching/Memcached/DefaultServerPool.cs b/Enyim.Caching/Memcached/DefaultServerPool.cs index 87565713..0810e9df 100644 --- a/Enyim.Caching/Memcached/DefaultServerPool.cs +++ b/Enyim.Caching/Memcached/DefaultServerPool.cs @@ -48,7 +48,7 @@ public DefaultServerPool( catch { } } - protected virtual IMemcachedNode CreateNode(EndPoint endpoint) + protected virtual IMemcachedNode CreateNode(DnsEndPoint endpoint) { return new MemcachedNode(endpoint, this.configuration.SocketPool, _logger); } @@ -207,9 +207,9 @@ IEnumerable IServerPool.GetWorkingNodes() void IServerPool.Start() { this.allNodes = this.configuration.Servers. - Select(ip => + Select(ep => { - var node = this.CreateNode(ip); + var node = this.CreateNode(ep); node.Failed += this.NodeFail; return node; diff --git a/Enyim.Caching/Memcached/MemcachedNode.cs b/Enyim.Caching/Memcached/MemcachedNode.cs index fcd0c38d..d5ed4ae8 100755 --- a/Enyim.Caching/Memcached/MemcachedNode.cs +++ b/Enyim.Caching/Memcached/MemcachedNode.cs @@ -27,16 +27,15 @@ public class MemcachedNode : IMemcachedNode private bool isDisposed; - private EndPoint endPoint; + private DnsEndPoint endPoint; private ISocketPoolConfiguration config; private InternalPoolImpl internalPoolImpl; private bool isInitialized; public MemcachedNode( - EndPoint endpoint, + DnsEndPoint endpoint, ISocketPoolConfiguration socketPoolConfig, - ILogger logger - ) + ILogger logger) { this.endPoint = endpoint; this.config = socketPoolConfig; diff --git a/Enyim.Caching/Memcached/PooledSocket.cs b/Enyim.Caching/Memcached/PooledSocket.cs index 5a9ba89f..ad0511f0 100755 --- a/Enyim.Caching/Memcached/PooledSocket.cs +++ b/Enyim.Caching/Memcached/PooledSocket.cs @@ -28,16 +28,14 @@ public partial class PooledSocket : IDisposable private Stream inputStream; private AsyncSocketHelper helper; - public PooledSocket(EndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout, ILogger logger) + public PooledSocket(DnsEndPoint endpoint, TimeSpan connectionTimeout, TimeSpan receiveTimeout, ILogger logger) { _logger = logger; this.isAlive = true; - var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - // TODO test if we're better off using nagle - //PHP: OPT_TCP_NODELAY - //socket.NoDelay = true; + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.NoDelay = true; var timeout = connectionTimeout == TimeSpan.MaxValue ? Timeout.Infinite @@ -48,77 +46,46 @@ public PooledSocket(EndPoint endpoint, TimeSpan connectionTimeout, TimeSpan rece : (int)receiveTimeout.TotalMilliseconds; socket.ReceiveTimeout = rcv; - socket.SendTimeout = rcv; - socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + socket.SendTimeout = rcv; ConnectWithTimeout(socket, endpoint, timeout); this.socket = socket; this.endpoint = endpoint; - this.inputStream = new BasicNetworkStream(socket); + this.inputStream = new NetworkStream(socket); } - private void ConnectWithTimeout(Socket socket, EndPoint endpoint, int timeout) + private void ConnectWithTimeout(Socket socket, DnsEndPoint endpoint, int timeout) { - //var task = socket.ConnectAsync(endpoint); - //if(!task.Wait(timeout)) - //{ - // using (socket) - // { - // throw new TimeoutException("Could not connect to " + endpoint); - // } - //} - - if (endpoint is DnsEndPoint && !RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + var args = new SocketAsyncEventArgs(); + + //Workaround for https://github.com/dotnet/corefx/issues/26840 + if (!IPAddress.TryParse(endpoint.Host, out var address)) { - var dnsEndPoint = ((DnsEndPoint)endpoint); - var host = dnsEndPoint.Host; - var addresses = Dns.GetHostAddresses(dnsEndPoint.Host); - var address = addresses.FirstOrDefault(ip => ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork); + address = Dns.GetHostAddresses(endpoint.Host).FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork); if (address == null) - { - throw new ArgumentException(String.Format("Could not resolve host '{0}'.", host)); - } - _logger.LogDebug($"Resolved '{host}' to '{address}'"); - endpoint = new IPEndPoint(address, dnsEndPoint.Port); + throw new ArgumentException(String.Format("Could not resolve host '{0}'.", endpoint.Host)); + args.RemoteEndPoint = new IPEndPoint(address, endpoint.Port); } - - var completed = new AutoResetEvent(false); - var args = new SocketAsyncEventArgs(); - args.RemoteEndPoint = endpoint; - args.Completed += OnConnectCompleted; - args.UserToken = completed; - socket.ConnectAsync(args); - if (!completed.WaitOne(timeout) || !socket.Connected) + else { - using (socket) - { - throw new TimeoutException("Could not connect to " + endpoint); - } + args.RemoteEndPoint = endpoint; } - /* - var mre = new ManualResetEvent(false); - socket.Connect(endpoint, iar => + using (var mres = new ManualResetEventSlim()) { - try { using (iar.AsyncWaitHandle) socket.EndConnect(iar); } - catch { } - - mre.Set(); - }, null); - - if (!mre.WaitOne(timeout) || !socket.Connected) - using (socket) - throw new TimeoutException("Could not connect to " + endpoint); - */ - } - - private void OnConnectCompleted(object sender, SocketAsyncEventArgs args) - { - EventWaitHandle handle = (EventWaitHandle)args.UserToken; - handle.Set(); - } + args.Completed += (s, e) => mres.Set(); + if (socket.ConnectAsync(args)) + { + if(!mres.Wait(timeout)) + { + throw new TimeoutException("Could not connect to " + endpoint); + } + } + } + } public Action CleanupCallback { get; set; } diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinaryNode.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinaryNode.cs index 68bf0f07..904e1f23 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinaryNode.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinaryNode.cs @@ -20,7 +20,7 @@ public class BinaryNode : MemcachedNode ISaslAuthenticationProvider authenticationProvider; public BinaryNode( - EndPoint endpoint, + DnsEndPoint endpoint, ISocketPoolConfiguration config, ISaslAuthenticationProvider authenticationProvider, ILogger logger) diff --git a/Enyim.Caching/Memcached/Protocol/Binary/BinaryPool.cs b/Enyim.Caching/Memcached/Protocol/Binary/BinaryPool.cs index c6236d41..8fbc3253 100644 --- a/Enyim.Caching/Memcached/Protocol/Binary/BinaryPool.cs +++ b/Enyim.Caching/Memcached/Protocol/Binary/BinaryPool.cs @@ -27,7 +27,7 @@ public BinaryPool(IMemcachedClientConfiguration configuration, ILogger logger) _logger = logger; } - protected override IMemcachedNode CreateNode(EndPoint endpoint) + protected override IMemcachedNode CreateNode(DnsEndPoint endpoint) { return new BinaryNode(endpoint, this.configuration.SocketPool, this.authenticationProvider, _logger); }