Permalink
Browse files

NCBC-1307: SDK fails to connect to nodes after restart

Motivation
----------
If the client detects that the entire cluster has gone offline, the client
should send NodeUnavailableExceptions after detecting that connectivity
has been lost. When the cluster becomes available, the client should
reconnect and continue to process requests without manual intervention.

Modifications
-------------
- Added INFO level logging to MultiplexingConnection and
  MultiplexingIOService and related classes
- Added synchronization to critical areas
- Improved exception handling

Results
-------
After a cluster goes offline, once the cluster comes back online the
client should recover.

Change-Id: I8ec571a653b7dd4fee8861b295ef9e69d3fefc3a
Reviewed-on: http://review.couchbase.org/73423
Reviewed-by: Jeffry Morris <jeffrymorris@gmail.com>
Tested-by: Jeffry Morris <jeffrymorris@gmail.com>
  • Loading branch information...
jeffrymorris committed Feb 9, 2017
1 parent a03eb29 commit e49584477c4b62e78f4e275125f9310eb2f1ee1d
@@ -280,7 +280,7 @@ await op.ReadAsync(response, 0, response.Length)
else
{
const string msg = "Cannot find callback object for operation: {0}";
tcs.SetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
tcs.TrySetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
}
};
return func;
@@ -371,7 +371,7 @@ await op.ReadAsync(response, 0, response.Length)
else
{
const string msg = "Cannot find callback object for operation: {0}";
tcs.SetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
tcs.TrySetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
}
};
return func;
@@ -425,7 +425,7 @@ await op.ReadAsync(response, 0, response.Length)
else
{
const string msg = "Cannot find callback object for operation: {0}";
tcs.SetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
tcs.TrySetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
}
};
return func;
@@ -478,7 +478,7 @@ await op.ReadAsync(response, 0, response.Length)
else
{
const string msg = "Cannot find callback object for operation: {0}";
tcs.SetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
tcs.TrySetException(new InvalidOperationException(string.Format(msg, s.Opaque)));
}
};
return func;
@@ -301,6 +301,10 @@ public uint Revision
/// </summary>
private void _heartBeatTimer_Elapsed(object state)
{
if (Thread.CurrentThread.Name == null)
{
Thread.CurrentThread.Name = "HB-" + Thread.CurrentThread.ManagedThreadId;
}
Log.Info("Checking if node {0} is down: {1}", EndPoint, _isDown);
if (_isDown && !_disposed)
{
@@ -147,7 +147,7 @@ public T Acquire()
if (_count < _configuration.MaxSize && !_disposed)
{
Log.Info("Trying to acquire new connection!");
Log.Info("Trying to acquire new connection! Refs={0}", _refs.Count);
connection = _factory(this, _converter, _bufferAllocator);
_refs.TryAdd(connection.Identity, connection);
@@ -183,8 +183,8 @@ private T AcquireFromPool()
if (_store.TryDequeue(out connection) && !_disposed)
{
Interlocked.Exchange(ref _acquireFailedCount, 0);
Log.Debug("Acquire existing: {0} | {1} | [{2}, {3}] - {4} - Disposed: {5}",
connection.Identity, EndPoint, _store.Count, _count, _identity, _disposed);
Log.Debug("Acquire existing: {0} | {1} | [{2}, {3}] - {4} - Disposed: {5} - Refs={6}",
connection.Identity, EndPoint, _store.Count, _count, _identity, _disposed, _refs.Count);
connection.MarkUsed(true);
return connection;
@@ -199,7 +199,7 @@ private T AcquireFromPool()
/// <param name="connection">The <see cref="IConnection"/> to release back into the pool.</param>
public void Release(T connection)
{
Log.Debug("Releasing: {0} on {1} - {2}", connection.Identity, EndPoint, _identity);
Log.Info("Releasing: {0} on {1} - {2} - Refs={3}", connection.Identity, EndPoint, _identity, _refs.Count);
connection.MarkUsed(false);
if (connection.IsDead)
{
@@ -226,6 +226,7 @@ public void Release(T connection)
{
_store.Enqueue(connection);
}
Log.Info("Released: {0} on {1} - {2} - Refs={3}", connection.Identity, EndPoint, _identity, _refs.Count);
_autoResetEvent.Set();
}
@@ -21,6 +21,7 @@ public class MultiplexingConnection : ConnectionBase
private readonly Thread _receiveThread;
private byte[] _receiveBuffer;
private int _receiveBufferLength;
private readonly object _syncObj = new object();
public MultiplexingConnection(IConnectionPool connectionPool, Socket socket, IByteConverter converter,
BufferAllocator allocator)
@@ -78,11 +79,19 @@ public override void SendAsync(byte[] request, Func<SocketAsyncState, Task> call
var sentBytesCount = 0;
lock (Socket)
{
do
try
{
sentBytesCount += Socket.Send(request, sentBytesCount, request.Length - sentBytesCount, SocketFlags.None);
do
{
sentBytesCount += Socket.Send(request, sentBytesCount, request.Length - sentBytesCount,
SocketFlags.None);
} while (sentBytesCount < request.Length);
} while (sentBytesCount < request.Length);
}
catch (Exception e)
{
HandleDisconnect(e);
}
}
}
@@ -106,11 +115,19 @@ public override byte[] Send(byte[] request)
var sentBytesCount = 0;
lock (Socket)
{
do
try
{
sentBytesCount += Socket.Send(request, sentBytesCount, request.Length - sentBytesCount, SocketFlags.None);
do
{
sentBytesCount += Socket.Send(request, sentBytesCount, request.Length - sentBytesCount,
SocketFlags.None);
} while (sentBytesCount < request.Length);
} while (sentBytesCount < request.Length);
}
catch (Exception e)
{
HandleDisconnect(e);
}
}
var didComplete = state.SyncWait.WaitOne(Configuration.SendTimeout);
@@ -261,60 +278,52 @@ private void HandleDisconnect(Exception exception)
}
public void Close()
{
if (Socket != null)
if (Disposed) return;
lock (_syncObj)
{
try
Log.Info("Closing connection {0}", Identity);
Disposed = true;
IsDead = true;
MarkUsed(false);
if (Socket != null)
{
if (Socket.Connected)
try
{
Socket.Shutdown(SocketShutdown.Both);
if (Socket.Connected)
{
Socket.Shutdown(SocketShutdown.Both);
}
//base.Dispose(); ignore base
}
catch (Exception e)
{
Log.Info(e);
}
finally
{
Socket.Dispose();
}
}
catch (Exception) { }
finally
{
IsDead = true;
Socket.Dispose();
}
//free up all states in flight
lock (_statesInFlight)
{
foreach (IState state in _statesInFlight.Values)
//free up all states in flight
lock (_statesInFlight)
{
//this hould have a correct handling where some kind of exception is thrown in the unblocked method
var state1 = state;
Task.Run(() => state1.Complete(null));
foreach (IState state in _statesInFlight.Values)
{
//this hould have a correct handling where some kind of exception is thrown in the unblocked method
var state1 = state;
Task.Run(() => state1.Complete(null));
}
}
}
}
}
public override void Dispose()
{
if (Disposed || InUse && !IsDead) return;
if (Disposed) return;
Log.Debug("Disposing {0}", _identity);
Disposed = true;
IsDead = true;
try
{
if (Socket != null)
{
if (Socket.Connected)
{
Socket.Shutdown(SocketShutdown.Both);
}
Socket.Dispose();
}
//call the bases dispose to cleanup the timer
base.Dispose();
}
catch (Exception e)
{
Log.Info(e);
}
Close();
}
}
}
Oops, something went wrong.

0 comments on commit e495844

Please sign in to comment.