Skip to content

Commit

Permalink
SPY-108: Verify connections are sound before completing them entirely.
Browse files Browse the repository at this point in the history
Without this change, the client considers the connection to be
good any time a connection has been established.  This is not the
case in some critical situations, such as a hung process or a very
busy remote server.

Solution is to ping the server with a version op, since that can happen
before auth, and use that as a way to verify the thing is actually
alive.

Note there is also a refactoring in here renaming the variable for
the node.

Change-Id: Ie3c0bc967b0705df6e58bf8ef81b158db8576bf3
Reviewed-on: http://review.couchbase.org/23840
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
ingenthr authored and Michael Nitschinger committed Jan 16, 2013
1 parent 10a4057 commit 39d1a8d
Showing 1 changed file with 86 additions and 26 deletions.
112 changes: 86 additions & 26 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Expand Up @@ -45,17 +45,24 @@
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


import net.spy.memcached.compat.SpyThread; import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.KeyedOperation; import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.NoopOperation;
import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException; import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation; import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.ops.VBucketAware; import net.spy.memcached.ops.VBucketAware;
import net.spy.memcached.ops.VersionOperation;
import net.spy.memcached.protocol.binary.BinaryOperationFactory; import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.protocol.binary.TapAckOperationImpl; import net.spy.memcached.protocol.binary.TapAckOperationImpl;
import net.spy.memcached.util.StringUtils; import net.spy.memcached.util.StringUtils;
Expand Down Expand Up @@ -340,12 +347,12 @@ public boolean removeObserver(ConnectionObserver obs) {
return connObservers.remove(obs); return connObservers.remove(obs);
} }


private void connected(MemcachedNode qa) { private void connected(MemcachedNode node) {
assert qa.getChannel().isConnected() : "Not connected."; assert node.getChannel().isConnected() : "Not connected.";
int rt = qa.getReconnectCount(); int rt = node.getReconnectCount();
qa.connected(); node.connected();
for (ConnectionObserver observer : connObservers) { for (ConnectionObserver observer : connObservers) {
observer.connectionEstablished(qa.getSocketAddress(), rt); observer.connectionEstablished(node.getSocketAddress(), rt);
} }
} }


Expand All @@ -356,63 +363,116 @@ private void lostConnection(MemcachedNode qa) {
} }
} }


// Handle IO for a specific selector. Any IOException will cause a /**
// reconnect * Handle IO for a specific selector. Any IOException will cause a
* reconnect.
*
* Note that this code makes sure that the corresponding node is not only
* able to connect, but also able to respond in a correct fashion. This is
* handled by issuing a dummy version/noop call and making sure it returns in
* a correct and timely fashion.
*
* @param sk the selector to handle IO against.
*/
private void handleIO(SelectionKey sk) { private void handleIO(SelectionKey sk) {
MemcachedNode qa = (MemcachedNode) sk.attachment(); MemcachedNode node = (MemcachedNode) sk.attachment();
try { try {
getLogger().debug("Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", sk, getLogger().debug("Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", sk,
sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.isReadable(), sk.isWritable(), sk.isConnectable(),
sk.attachment()); sk.attachment());
if (sk.isConnectable()) { if (sk.isConnectable()) {
getLogger().info("Connection state changed for %s", sk); getLogger().info("Connection state changed for %s", sk);
final SocketChannel channel = qa.getChannel(); final SocketChannel channel = node.getChannel();
if (channel.finishConnect()) { if (channel.finishConnect()) {
connected(qa);
addedQueue.offer(qa); // Test to see if it's truly alive, could be a hung process, OS
if (qa.getWbuf().hasRemaining()) { final CountDownLatch latch = new CountDownLatch(1);
handleWrites(sk, qa); final OperationFuture<Boolean> rv =
new OperationFuture<Boolean>("noop", latch, 2500);
NoopOperation testOp = opFact.noop(new OperationCallback() {
public void receivedStatus(OperationStatus status) {
rv.set(status.isSuccess(), status);
}

@Override
public void complete() {
latch.countDown();
}
});

testOp.setHandlingNode(node);
testOp.initialize();

checkState();
insertOperation(node, testOp);
node.copyInputQueue();

boolean done = false;
if(sk.isValid()) {
long timeout = TimeUnit.MILLISECONDS.toNanos(
connectionFactory.getOperationTimeout());
for(long stop = System.nanoTime() + timeout;
stop > System.nanoTime();) {
handleWrites(sk, node);
handleReads(sk, node);
if(done = (latch.getCount() == 0)) {
break;
}
}
}

if (!done || testOp.isCancelled() || testOp.hasErrored() ||
testOp.isTimedOut()) {
throw new ConnectException("Could not send noop upon connect! "
+ "This may indicate a running, but not responding memcached "
+ "instance.");
}

connected(node);
addedQueue.offer(node);
if (node.getWbuf().hasRemaining()) {
handleWrites(sk, node);
} }
} else { } else {
assert !channel.isConnected() : "connected"; assert !channel.isConnected() : "connected";
} }
} else { } else {
if (sk.isValid() && sk.isReadable()) { if (sk.isValid() && sk.isReadable()) {
handleReads(sk, qa); handleReads(sk, node);
} }
if (sk.isValid() && sk.isWritable()) { if (sk.isValid() && sk.isWritable()) {
handleWrites(sk, qa); handleWrites(sk, node);
} }
} }
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
// Note, not all channel closes end up here // Note, not all channel closes end up here
if (!shutDown) { if (!shutDown) {
getLogger().info("Closed channel and not shutting down. Queueing" getLogger().info("Closed channel and not shutting down. Queueing"
+ " reconnect on %s", qa, e); + " reconnect on %s", node, e);
lostConnection(qa); lostConnection(node);
} }
} catch (ConnectException e) { } catch (ConnectException e) {
// Failures to establish a connection should attempt a reconnect // Failures to establish a connection should attempt a reconnect
// without signaling the observers. // without signaling the observers.
getLogger().info("Reconnecting due to failure to connect to %s", qa, e); getLogger().info("Reconnecting due to failure to connect to %s", node, e);
queueReconnect(qa); queueReconnect(node);
} catch (OperationException e) { } catch (OperationException e) {
qa.setupForAuth(); // noop if !shouldAuth node.setupForAuth(); // noop if !shouldAuth
getLogger().info("Reconnection due to exception handling a memcached " getLogger().info("Reconnection due to exception handling a memcached "
+ "operation on %s. This may be due to an authentication failure.", + "operation on %s. This may be due to an authentication failure.",
qa, e); node, e);
lostConnection(qa); lostConnection(node);
} catch (Exception e) { } catch (Exception e) {
// Any particular error processing an item should simply // Any particular error processing an item should simply
// cause us to reconnect to the server. // cause us to reconnect to the server.
// //
// One cause is just network oddness or servers // One cause is just network oddness or servers
// restarting, which lead here with IOException // restarting, which lead here with IOException
qa.setupForAuth(); // noop if !shouldAuth node.setupForAuth(); // noop if !shouldAuth
getLogger().info("Reconnecting due to exception on %s", qa, e); getLogger().info("Reconnecting due to exception on %s", node, e);
lostConnection(qa); lostConnection(node);
} }
qa.fixupOps(); node.fixupOps();
} }


private void handleWrites(SelectionKey sk, MemcachedNode qa) private void handleWrites(SelectionKey sk, MemcachedNode qa)
Expand Down

0 comments on commit 39d1a8d

Please sign in to comment.