Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Reconnect to configuration stream if disconnected. JCBC-19

With this change, a Bucket which is marked as disconnected will be
later reconnected when noticed by a calling thread.

The logic for this is a little convoluted at the moment, but tests
to be correct.  In the future, this whole section should be refactored
to be a better NodeLocator or something similar.  See JCBC-28.

Change-Id: I8541493fdf7e8c504c4cbd512ca43e3416b03829
Reviewed-on: http://review.couchbase.org/14574
Tested-by: Matt Ingenthron <matt@couchbase.com>
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information...
commit 7c049c5290766789cc30098238d34858ea07cf14 1 parent ce7a084
@ingenthr ingenthr authored ingenthr committed
View
11 src/main/java/com/couchbase/client/CouchbaseClient.java
@@ -160,11 +160,18 @@ protected CouchbaseClient(CouchbaseConnectionFactory cf, boolean subscribe)
}
/**
- * This function is called when there is a topology change in the cluster.
- * This function is intended for internal use only.
+ * This method is called when there is a topology change in the cluster.
+ *
+ * This method is intended for internal use only.
*/
public void reconfigure(Bucket bucket) {
reconfiguring = true;
+ if (bucket.isNotUpdating()) {
+ getLogger().info("Bucket configuration is disconnected from cluster "
+ + "configuration updates, attempting to reconnect.");
+ CouchbaseConnectionFactory cbcf = (CouchbaseConnectionFactory)connFactory;
+ cbcf.requestConfigReconnect(cbcf.getBucketName(), this);
+ }
try {
((CouchbaseConnection)mconn).reconfigure(bucket);
} catch (IllegalArgumentException ex) {
View
14 src/main/java/com/couchbase/client/CouchbaseConnection.java
@@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
-import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedConnection;
@@ -56,13 +55,16 @@
public class CouchbaseConnection extends MemcachedConnection implements
Reconfigurable {
- public CouchbaseConnection(int bufSize, ConnectionFactory f,
+ protected volatile boolean reconfiguring = false;
+ private final CouchbaseConnectionFactory cf;
+
+ public CouchbaseConnection(int bufSize, CouchbaseConnectionFactory f,
List<InetSocketAddress> a, Collection<ConnectionObserver> obs,
FailureMode fm, OperationFactory opfactory) throws IOException {
super(bufSize, f, a, obs, fm, opfactory);
+ this.cf = f;
}
- protected volatile boolean reconfiguring = false;
public void reconfigure(Bucket bucket) {
reconfiguring = true;
@@ -157,8 +159,10 @@ public void addOperation(final String key, final Operation o) {
if (placeIn == null) {
placeIn = primary;
this.getLogger().warn(
- "Could not redistribute "
- + "to another node, retrying primary node for %s.", key);
+ "Node exepcted to receive data is inactive. This could be due to a"
+ + "failure within the cluster. Will check for updated "
+ + "configuration. Key without a configured node is: %s.", key);
+ cf.checkConfigUpdate();
}
}
View
60 src/main/java/com/couchbase/client/CouchbaseConnectionFactory.java
@@ -25,6 +25,7 @@
import com.couchbase.client.vbucket.ConfigurationException;
import com.couchbase.client.vbucket.ConfigurationProvider;
import com.couchbase.client.vbucket.ConfigurationProviderHTTP;
+import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.VBucketNodeLocator;
import com.couchbase.client.vbucket.config.Config;
import com.couchbase.client.vbucket.config.ConfigType;
@@ -32,7 +33,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import net.spy.memcached.BinaryConnectionFactory;
import net.spy.memcached.DefaultHashAlgorithm;
@@ -76,18 +80,26 @@
*/
public static final int DEFAULT_OP_QUEUE_LEN = 16384;
- private final ConfigurationProvider configurationProvider;
+ private volatile ConfigurationProvider configurationProvider;
private final String bucket;
private final String pass;
+ private final List<URI> storedBaseList;
+ private static final Logger LOGGER =
+ Logger.getLogger(CouchbaseConnectionFactory.class.getName());
+ private boolean needsReconnect;
+ private volatile long thresholdLastCheck = System.nanoTime();
+ private volatile int configThresholdCount = 0;
+ private final int maxConfigCheck = 10; //maximum checks in 10 sec interval
public CouchbaseConnectionFactory(final List<URI> baseList,
final String bucketName, final String password)
throws IOException {
- // ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder(cf);
+ storedBaseList = new ArrayList<URI>();
for (URI bu : baseList) {
if (!bu.isAbsolute()) {
throw new IllegalArgumentException("The base URI must be absolute");
}
+ storedBaseList.add(bu);
}
bucket = bucketName;
pass = password;
@@ -136,6 +148,16 @@ public String getBucketName() {
public Config getVBucketConfig() {
try {
+ // If we find the config provider associated with this bucket is
+ // disconnected and thus stale, we simply replace the configuration
+ // provider
+ if (configurationProvider.getBucketConfiguration(bucket)
+ .isNotUpdating()) {
+ LOGGER.warning("Noticed bucket configuration to be disconnected, "
+ + "will attempt to reconnect");
+ configurationProvider = new ConfigurationProviderHTTP(storedBaseList,
+ bucket, pass);
+ }
return configurationProvider.getBucketConfiguration(bucket).getConfig();
} catch (ConfigurationException e) {
return null;
@@ -145,4 +167,38 @@ public Config getVBucketConfig() {
public ConfigurationProvider getConfigurationProvider() {
return configurationProvider;
}
+
+ protected void requestConfigReconnect(String bucketName, Reconfigurable rec) {
+ configurationProvider.markForResubscribe(bucketName, rec);
+ needsReconnect = true;
+ }
+
+ void checkConfigUpdate() {
+ if (needsReconnect || pastReconnThreshold()) {
+ LOGGER.log(Level.INFO,
+ "Attempting to resubscribe for cluster config updates.");
+ configurationProvider =
+ new ConfigurationProviderHTTP(storedBaseList, bucket, pass);
+ configurationProvider.finishResubscribe();
+ } else {
+ LOGGER.log(Level.WARNING, "No reconnect required, though check requested."
+ + " Current config check is {0} out of a threshold of {1}.",
+ new Object[]{configThresholdCount, maxConfigCheck});
+ }
+ }
+
+ private boolean pastReconnThreshold() {
+ long currentTime = System.nanoTime();
+ if (currentTime - thresholdLastCheck > 100000000) { //if longer than 10 sec
+ configThresholdCount = 0;
+ }
+ configThresholdCount++;
+ thresholdLastCheck = currentTime;
+
+ if (configThresholdCount >= maxConfigCheck) {
+ return true;
+ }
+ return false;
+ }
+
}
View
30 src/main/java/com/couchbase/client/vbucket/BucketMonitor.java
@@ -49,7 +49,7 @@
/**
* The BucketMonitor will open an HTTP comet stream to monitor for changes to
- * the list of nodes. If the list of nodes changes
+ * the list of nodes. If the list of nodes changes, it will notify observers.
*/
public class BucketMonitor extends Observable {
@@ -64,6 +64,8 @@
private ConfigurationParser configParser;
private BucketUpdateResponseHandler handler;
private final HttpMessageHeaders headers;
+ private static final Logger LOGGER =
+ Logger.getLogger(BucketMonitor.class.getName());
/**
* The specification version which this client meets. This will be included in
* requests to the server.
@@ -88,7 +90,6 @@ public BucketMonitor(URI cometStreamURI, String bucketname, String username,
: cometStreamURI.getScheme();
if (!scheme.equals("http")) {
// an SslHandler is needed in the pipeline
- // System.err.println("Only HTTP is supported.");
throw new UnsupportedOperationException("Only http is supported.");
}
@@ -104,6 +105,19 @@ public BucketMonitor(URI cometStreamURI, String bucketname, String username,
}
/**
+ * Take any action required when the monitor appears to be disconnected.
+ */
+ protected void notifyDisconnected() {
+ this.bucket.setIsNotUpdating();
+ setChanged();
+ LOGGER.log(Level.FINE, "Marked bucket " + this.bucket.getName()
+ + " as not updating. Notifying observers.");
+ LOGGER.log(Level.FINER, "There appear to be " + this.countObservers()
+ + " observers waiting for notification");
+ notifyObservers(this.bucket);
+ }
+
+ /**
* A strategy that selects and invokes the appropriate setHeader method on
* the netty HttpHeader class, either setHeader(String, Object) or
* setHeader(String, String). This indirection is needed as with netty 3.2.0
@@ -176,8 +190,8 @@ public void startMonitor() {
"Invalid client configuration received from server. Staying with "
+ "existing configuration.", ex);
Logger.getLogger(BucketMonitor.class.getName()).log(Level.FINE,
- "Invalid client configuration received:\n" + handler.getLastResponse()
- + "\n");
+ "Invalid client configuration received:\n{0}",
+ handler.getLastResponse());
}
}
@@ -195,7 +209,8 @@ protected void createChannel() {
channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
bootstrap.releaseExternalResources();
- throw new ConnectionException("Could not connect to any pool member.");
+ throw new ConnectionException("Could not connect to any cluster pool "
+ + "member.");
}
assert (channel != null);
}
@@ -282,7 +297,10 @@ public void shutdown(long timeout, TimeUnit unit) {
factory.releaseExternalResources();
}
- protected void invalidate() {
+ /**
+ * Replace the previously received configuration with the current one.
+ */
+ protected void replaceConfig() {
try {
String response = handler.getLastResponse();
Bucket updatedBucket = this.configParser.parseBucket(response);
View
35 src/main/java/com/couchbase/client/vbucket/BucketUpdateResponseHandler.java
@@ -32,6 +32,7 @@
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -77,7 +78,7 @@ public void messageReceived(final ChannelHandlerContext context,
partialResponse = null;
getLatch().countDown();
if (monitor != null) {
- monitor.invalidate();
+ monitor.replaceConfig();
}
} else {
finerLog(curChunk);
@@ -175,9 +176,23 @@ private void finerLog(String message) {
public void handleUpstream(ChannelHandlerContext context, ChannelEvent event)
throws Exception {
if (event instanceof ChannelStateEvent) {
- LOGGER.log(Level.FINEST, "Channel state changed: " + event + "\n\n");
+ ChannelStateEvent csEvent = (ChannelStateEvent)event;
+ LOGGER.log(Level.FINEST, "Channel state changed: {0}\n\n", csEvent);
+ if (csEvent.getValue() == null
+ && csEvent.getState() == ChannelState.CONNECTED) { // a disconnect
+ LOGGER.log(Level.FINE, "Channel has been disconnected on us, "
+ + "restarting the monitor.");
+ monitor.notifyDisconnected(); // connection has been dropped
+ } else {
+ LOGGER.log(Level.FINER, "Channel state change is not a disconnect. "
+ + "Event value is {0} and Channel State is {1}.",
+ new Object[]{csEvent.getValue().toString(),
+ csEvent.getState().toString()});
+ }
+ }
+ if (event.getChannel().isConnected()) {
+ super.handleUpstream(context, event);
}
- super.handleUpstream(context, event);
}
protected void setBucketMonitor(BucketMonitor newMonitor) {
@@ -187,15 +202,21 @@ protected void setBucketMonitor(BucketMonitor newMonitor) {
/*
* @todo we need to investigate why the exception occurs, and if there is a
* better solution to the problem than just shutting down the connection. For
- * now just invalidate the BucketMonitor, and the Client manager will recreate
- * the connection.
+ * now just invalidate the BucketMonitor, and we will recreate the connection.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
- LOGGER.log(Level.INFO, "Exception occurred: ");
+ Throwable ex = e.getCause();
+ LOGGER.log(Level.WARNING, "Exception occurred: " + ex.getMessage() + "\n");
+ StringBuilder sb = new StringBuilder();
+ for (StackTraceElement one : ex.getStackTrace()) {
+ sb.append(one.toString());
+ sb.append("\n");
+ }
+ LOGGER.log(Level.WARNING, sb.toString());
if (monitor != null) {
- monitor.invalidate();
+ monitor.replaceConfig();
}
}
}
View
4 src/main/java/com/couchbase/client/vbucket/ConfigurationProvider.java
@@ -48,6 +48,8 @@
*/
void subscribe(String bucketName, Reconfigurable rec);
+ void markForResubscribe(String bucketName, Reconfigurable rec);
+
/**
* Unsubscribe from updates on a given bucket and given reconfigurable.
*
@@ -67,4 +69,6 @@
* @return the anonymous bucket's name i.e. 'default'
*/
String getAnonymousAuthBucket();
+
+ void finishResubscribe();
}
View
18 src/main/java/com/couchbase/client/vbucket/ConfigurationProviderHTTP.java
@@ -81,6 +81,8 @@
new ConfigurationParserJSON();
private Map<String, BucketMonitor> monitors =
new HashMap<String, BucketMonitor>();
+ private static String reSubBucket;
+ private static Reconfigurable reSubRec;
/**
* Constructs a configuration provider with disabled authentication for the
@@ -100,10 +102,9 @@ public ConfigurationProviderHTTP(List<URI> baseList) throws IOException {
* @param baseList list of urls to treat as base
* @param restUsr username
* @param restPwd password
- * @throws IOException
*/
public ConfigurationProviderHTTP(List<URI> baseList, String restUsr,
- String restPwd) throws IOException {
+ String restPwd) {
this.baseList = baseList;
this.restUsr = restUsr;
this.restPwd = restPwd;
@@ -209,6 +210,16 @@ private void readPools(String bucketToFind) {
return AddrUtil.getAddresses(serversString.toString());
}
+ public void finishResubscribe() {
+ monitors.clear();
+ subscribe(reSubBucket, reSubRec);
+ }
+
+ public void markForResubscribe(String bucketName, Reconfigurable rec) {
+ reSubBucket = bucketName; // can't subscribe here, must from user request
+ reSubRec = rec;
+ }
+
/**
* Subscribes for configuration updates.
*
@@ -218,6 +229,9 @@ private void readPools(String bucketToFind) {
public void subscribe(String bucketName, Reconfigurable rec) {
Bucket bucket = getBucketConfiguration(bucketName);
+ getLogger().debug("Subscribing an object for reconfiguration updates "
+ + rec.getClass().getName());
+
ReconfigurableObserver obs = new ReconfigurableObserver(rec);
BucketMonitor monitor = this.monitors.get(bucketName);
if (monitor == null) {
View
7 src/main/java/com/couchbase/client/vbucket/ReconfigurableObserver.java
@@ -26,12 +26,15 @@
import java.util.Observable;
import java.util.Observer;
+import java.util.logging.Logger;
/**
* An implementation of the observer for calling reconfigure.
*/
public class ReconfigurableObserver implements Observer {
private final Reconfigurable rec;
+ private static final Logger LOGGER = Logger.getLogger(
+ ReconfigurableObserver.class.getName());
public ReconfigurableObserver(Reconfigurable rec) {
this.rec = rec;
@@ -44,6 +47,10 @@ public ReconfigurableObserver(Reconfigurable rec) {
* @param arg
*/
public void update(Observable o, Object arg) {
+ LOGGER.finest("Received an update, notifying reconfigurables about a "
+ + arg.getClass().getName() + arg.toString());
+ LOGGER.finest("It says it is " + ((Bucket)arg).getName()
+ + " and it's talking to " + ((Bucket)arg).getStreamingURI());
rec.reconfigure((Bucket) arg);
}
View
27 src/main/java/com/couchbase/client/vbucket/config/Bucket.java
@@ -24,6 +24,7 @@
import java.net.URI;
import java.util.List;
+import java.util.logging.Logger;
/**
* Bucket configuration bean.
@@ -35,6 +36,9 @@
private final Config configuration;
// bucket's streaming uri
private final URI streamingURI;
+ private volatile boolean isNotUpdating;
+ private static final Logger LOGGER = Logger.getLogger(
+ Bucket.class.getName());
// nodes list
private final List<Node> nodes;
@@ -45,6 +49,7 @@ public Bucket(String name, Config configuration, URI streamingURI,
this.configuration = configuration;
this.streamingURI = streamingURI;
this.nodes = nodes;
+ this.isNotUpdating = false;
}
public String getName() {
@@ -90,4 +95,26 @@ public int hashCode() {
result = 31 * result + nodes.hashCode();
return result;
}
+
+ /**
+ * Indicates whether or not the bucket is being monitored for updates.
+ *
+ * @return the isNotUpdating
+ */
+ public boolean isNotUpdating() {
+ return isNotUpdating;
+ }
+
+ /**
+ * Mark this bucket as not receiving updates. This means the config
+ * could be stale.
+ *
+ * This is intended for internal use only.
+ *
+ */
+ public final void setIsNotUpdating() {
+ this.isNotUpdating = true;
+ LOGGER.finest("Marking bucket as not updating,"
+ + " disconnected from config stream");
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.