Skip to content
Browse files

Added MembaseClient Object

The MembaseClient object should be used when making connections
to Membase clusters. This commit also includes a new
MembaseConnectionFactory and adds adds the coresponding builder
code to ConnectionFactoryBuilder. Test support is also added
for connections specific to Membase clusters

Change-Id: I7209adf74c871a9ad6d7cf3e46a9c2c377b588ec
Reviewed-on: http://review.couchbase.org/7969
Tested-by: Michael Wiederhold <mike@couchbase.com>
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
  • Loading branch information...
1 parent 5f01535 commit b27a441ee76f00e49fe3383f68dfa8cf641952dd Mike Wiederhold committed with mikewied
View
130 src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java
@@ -1,5 +1,7 @@
package net.spy.memcached;
+import java.io.IOException;
+import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -367,6 +369,134 @@ public int getTimeoutExceptionThreshold() {
}
/**
+ * Get the MembaseConnectionFactory set up with the provided parameters.
+ * Note that a MembaseConnectionFactory requires the failure mode is set
+ * to retry, and the locator type is discovered dynamically based on the
+ * cluster you are connecting to. As a result, these values will be
+ * overridden upon calling this function.
+ *
+ * @param baseList a list of URI's that will be used to connect to the cluster
+ * @param bucketName the name of the bucket to connect to
+ * @param usr the username for the bucket
+ * @param pass the password for the bucket
+ * @return a MembaseConnectionFactory object
+ * @throws IOException
+ */
+ public MembaseConnectionFactory buildMembaseConnection(final List<URI> baseList,
+ final String bucketName, final String usr, final String pwd) throws IOException {
+ return new MembaseConnectionFactory(baseList, bucketName, usr, pwd) {
+
+ @Override
+ public BlockingQueue<Operation> createOperationQueue() {
+ return opQueueFactory == null ?
+ super.createOperationQueue() : opQueueFactory.create();
+ }
+
+ @Override
+ public BlockingQueue<Operation> createReadOperationQueue() {
+ return readQueueFactory == null ?
+ super.createReadOperationQueue()
+ : readQueueFactory.create();
+ }
+
+ @Override
+ public BlockingQueue<Operation> createWriteOperationQueue() {
+ return writeQueueFactory == null ?
+ super.createReadOperationQueue()
+ : writeQueueFactory.create();
+ }
+
+ @Override
+ public NodeLocator createLocator(List<MemcachedNode> nodes) {
+ switch(getLocator()) {
+ case CONSISTENT:
+ return new KetamaNodeLocator(nodes, getHashAlg());
+ case VBUCKET:
+ return new VBucketNodeLocator(nodes, getVBucketConfig());
+ default: throw new IllegalStateException(
+ "Unhandled locator type: " + locator);
+ }
+ }
+
+ @Override
+ public Transcoder<Object> getDefaultTranscoder() {
+ return transcoder == null ?
+ super.getDefaultTranscoder() : transcoder;
+ }
+
+ @Override
+ public FailureMode getFailureMode() {
+ return failureMode;
+ }
+
+ @Override
+ public HashAlgorithm getHashAlg() {
+ return hashAlg;
+ }
+
+ @Override
+ public Collection<ConnectionObserver> getInitialObservers() {
+ return initialObservers;
+ }
+
+ @Override
+ public OperationFactory getOperationFactory() {
+ return opFact == null ? super.getOperationFactory() : opFact;
+ }
+
+ @Override
+ public long getOperationTimeout() {
+ return opTimeout == -1 ?
+ super.getOperationTimeout() : opTimeout;
+ }
+
+ @Override
+ public int getReadBufSize() {
+ return readBufSize == -1 ?
+ super.getReadBufSize() : readBufSize;
+ }
+
+ @Override
+ public boolean isDaemon() {
+ return isDaemon;
+ }
+
+ @Override
+ public boolean shouldOptimize() {
+ return shouldOptimize;
+ }
+
+ @Override
+ public boolean useNagleAlgorithm() {
+ return useNagle;
+ }
+
+ @Override
+ public long getMaxReconnectDelay() {
+ return maxReconnectDelay;
+ }
+
+ @Override
+ public AuthDescriptor getAuthDescriptor() {
+ return authDescriptor;
+ }
+
+ @Override
+ public long getOpQueueMaxBlockTime() {
+ return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime
+ : super.getOpQueueMaxBlockTime();
+ }
+
+ @Override
+ public int getTimeoutExceptionThreshold() {
+ return timeoutExceptionThreshold;
+ }
+
+ };
+
+ }
+
+ /**
* Type of protocol to use for connections.
*/
public static enum Protocol {
View
242 src/main/java/net/spy/memcached/MembaseClient.java
@@ -0,0 +1,242 @@
+package net.spy.memcached;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedSelectorException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.spy.memcached.internal.OperationFuture;
+import net.spy.memcached.ops.GetlOperation;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationStatus;
+import net.spy.memcached.transcoders.Transcoder;
+import net.spy.memcached.vbucket.ConfigurationException;
+import net.spy.memcached.vbucket.Reconfigurable;
+import net.spy.memcached.vbucket.config.Bucket;
+
+public class MembaseClient extends MemcachedClient implements MembaseClientIF, Reconfigurable {
+
+ private volatile boolean reconfiguring = false;
+
+ /**
+ * Get a MemcachedClient based on the REST response from a Membase server.
+ *
+ * This constructor is merely a convenience for situations where the bucket
+ * name is the same as the user name. This is commonly the case.
+ *
+ * To connect to the "default" special bucket for a given cluster, use an
+ * empty string as the password.
+ *
+ * If a password has not been assigned to the bucket, it is typically an
+ * empty string.
+ *
+ * @param baseList the URI list of one or more servers from the cluster
+ * @param bucketName the bucket name in the cluster you wish to use
+ * @param pwd the password for the bucket
+ * @throws IOException if connections could not be made
+ * @throws ConfigurationException if the configuration provided by the
+ * server has issues or is not compatible
+ */
+ public MembaseClient(List<URI> baseList, String bucketName, String pwd)
+ throws IOException, ConfigurationException {
+ this(new MembaseConnectionFactory(baseList, bucketName, bucketName, pwd));
+ }
+
+ /**
+ * Get a MemcachedClient based on the REST response from a Membase server
+ * where the username is different than the bucket name.
+ *
+ * To connect to the "default" special bucket for a given cluster, use an
+ * empty string as the password.
+ *
+ * If a password has not been assigned to the bucket, it is typically an
+ * empty string.
+ *
+ * @param baseList the URI list of one or more servers from the cluster
+ * @param bucketName the bucket name in the cluster you wish to use
+ * @param usr the username for the bucket; this nearly always be the same
+ * as the bucket name
+ * @param pwd the password for the bucket
+ * @throws IOException if connections could not be made
+ * @throws ConfigurationException if the configuration provided by the
+ * server has issues or is not compatible
+ */
+ public MembaseClient(final List<URI> baseList, final String bucketName,
+ final String usr, final String pwd) throws IOException, ConfigurationException {
+ this(new MembaseConnectionFactory(baseList, bucketName, usr, pwd));
+ }
+
+ /**
+ * Get a MemcachedClient based on the REST response from a Membase server
+ * where the username is different than the bucket name.
+ *
+ * Note that when specifying a ConnectionFactory you must specify a
+ * BinaryConnectionFactory. Also the ConnectionFactory's protocol
+ * and locator values are always overwritten. The protocol will always
+ * be binary and the locator will be chosen based on the bucket type you
+ * are connecting to.
+ *
+ * To connect to the "default" special bucket for a given cluster, use an
+ * empty string as the password.
+ *
+ * If a password has not been assigned to the bucket, it is typically an
+ * empty string.
+ *
+ * @param cf the ConnectionFactory to use to create connections
+ * @param baseList the URI list of one or more servers from the cluster
+ * @param bucketName the bucket name in the cluster you wish to use
+ * @param usr the username for the bucket; this nearly always be the same
+ * as the bucket name
+ * @param pwd the password for the bucket
+ * @throws IOException if connections could not be made
+ * @throws ConfigurationException if the configuration provided by the
+ * server has issues or is not compatible
+ */
+ public MembaseClient(MembaseConnectionFactory cf) throws IOException, ConfigurationException {
+ super(cf, AddrUtil.getAddresses(cf.getVBucketConfig().getServers()), false);
+ start();
+ }
+
+ public void reconfigure(Bucket bucket) {
+ reconfiguring = true;
+ try {
+ conn.reconfigure(bucket);
+ } catch (IllegalArgumentException ex) {
+ getLogger().warn("Failed to reconfigure client, staying with previous configuration.", ex);
+ } finally {
+ reconfiguring = false;
+ }
+ }
+
+ /**
+ * Gets and locks the given key asynchronously. By default the maximum allowed
+ * timeout is 30 seconds. Timeouts greater than this will be set to 30 seconds.
+ *
+ * @param key the key to fetch and lock
+ * @param exp the amount of time the lock should be valid for in seconds.
+ * @param tc the transcoder to serialize and unserialize value
+ * @return a future that will hold the return value of the fetch
+ * @throws IllegalStateException in the rare circumstance where queue
+ * is too full to accept any more requests
+ */
+ public <T> OperationFuture<CASValue<T>> asyncGetAndLock(final String key, int exp,
+ final Transcoder<T> tc) {
+ final CountDownLatch latch=new CountDownLatch(1);
+ final OperationFuture<CASValue<T>> rv=
+ new OperationFuture<CASValue<T>>(key, latch, operationTimeout);
+
+ Operation op=opFact.getl(key, exp,
+ new GetlOperation.Callback() {
+ private CASValue<T> val=null;
+ public void receivedStatus(OperationStatus status) {
+ rv.set(val, status);
+ }
+ public void gotData(String k, int flags, long cas, byte[] data) {
+ assert key.equals(k) : "Wrong key returned";
+ assert cas > 0 : "CAS was less than zero: " + cas;
+ val=new CASValue<T>(cas, tc.decode(
+ new CachedData(flags, data, tc.getMaxSize())));
+ }
+ public void complete() {
+ latch.countDown();
+ }});
+ rv.setOperation(op);
+ addOp(key, op);
+ return rv;
+ }
+
+ /**
+ * Get and lock the given key asynchronously and decode with the default
+ * transcoder. By default the maximum allowed timeout is 30 seconds.
+ * Timeouts greater than this will be set to 30 seconds.
+ *
+ * @param key the key to fetch and lock
+ * @param exp the amount of time the lock should be valid for in seconds.
+ * @return a future that will hold the return value of the fetch
+ * @throws IllegalStateException in the rare circumstance where queue
+ * is too full to accept any more requests
+ */
+ public OperationFuture<CASValue<Object>> asyncGetAndLock(final String key, int exp) {
+ return asyncGetAndLock(key, exp, transcoder);
+ }
+
+ /**
+ * Getl with a single key. By default the maximum allowed timeout is 30
+ * seconds. Timeouts greater than this will be set to 30 seconds.
+ *
+ * @param key the key to get and lock
+ * @param exp the amount of time the lock should be valid for in seconds.
+ * @param tc the transcoder to serialize and unserialize value
+ * @return the result from the cache (null if there is none)
+ * @throws OperationTimeoutException if the global operation timeout is
+ * exceeded
+ * @throws IllegalStateException in the rare circumstance where queue
+ * is too full to accept any more requests
+ */
+ public <T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc) {
+ try {
+ return asyncGetAndLock(key, exp, tc).get(
+ operationTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for value", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Exception waiting for value", e);
+ } catch (TimeoutException e) {
+ throw new OperationTimeoutException("Timeout waiting for value", e);
+ }
+ }
+
+ /**
+ * Get and lock with a single key and decode using the default transcoder.
+ * By default the maximum allowed timeout is 30 seconds. Timeouts greater
+ * than this will be set to 30 seconds.
+ * @param key the key to get and lock
+ * @param exp the amount of time the lock should be valid for in seconds.
+ * @return the result from the cache (null if there is none)
+ * @throws OperationTimeoutException if the global operation timeout is
+ * exceeded
+ * @throws IllegalStateException in the rare circumstance where queue
+ * is too full to accept any more requests
+ */
+ public CASValue<Object> getAndLock(String key, int exp) {
+ return getAndLock(key, exp, transcoder);
+ }
+
+ /**
+ * Infinitely loop processing IO.
+ */
+ @Override
+ public void run() {
+ while(running) {
+ if (!reconfiguring) {
+ try {
+ conn.handleIO();
+ } catch (IOException e) {
+ logRunException(e);
+ } catch (CancelledKeyException e) {
+ logRunException(e);
+ } catch (ClosedSelectorException e) {
+ logRunException(e);
+ } catch (IllegalStateException e) {
+ logRunException(e);
+ }
+ }
+ }
+ getLogger().info("Shut down memcached client");
+ }
+
+ @Override
+ public boolean shutdown(long timeout, TimeUnit unit) {
+ boolean shutdownResult = super.shutdown(timeout, unit);
+ MembaseConnectionFactory cf = (MembaseConnectionFactory)connFactory;
+ if (cf.getConfigurationProvider() != null) {
+ cf.getConfigurationProvider().shutdown();
+ }
+ return shutdownResult;
+ }
+}
View
20 src/main/java/net/spy/memcached/MembaseClientIF.java
@@ -0,0 +1,20 @@
+package net.spy.memcached;
+
+import java.util.concurrent.Future;
+
+import net.spy.memcached.transcoders.Transcoder;
+
+/**
+ * This interface is provided as a helper for testing clients of the MembaseClient.
+ */
+public interface MembaseClientIF extends MemcachedClientIF {
+
+ Future<CASValue<Object>> asyncGetAndLock(final String key, int exp);
+
+ <T> Future<CASValue<T>> asyncGetAndLock(final String key, int exp,
+ final Transcoder<T> tc);
+
+ public <T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc);
+
+ CASValue<Object> getAndLock(String key, int exp);
+}
View
112 src/main/java/net/spy/memcached/MembaseConnectionFactory.java
@@ -0,0 +1,112 @@
+package net.spy.memcached;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import net.spy.memcached.ConnectionFactoryBuilder.Locator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.auth.PlainCallbackHandler;
+import net.spy.memcached.vbucket.ConfigurationException;
+import net.spy.memcached.vbucket.ConfigurationProvider;
+import net.spy.memcached.vbucket.ConfigurationProviderHTTP;
+import net.spy.memcached.vbucket.VBucketNodeLocator;
+import net.spy.memcached.vbucket.config.Bucket;
+import net.spy.memcached.vbucket.config.Config;
+import net.spy.memcached.vbucket.config.ConfigType;
+
+/**
+ * Membase implementation of ConnectionFactory.
+ *
+ * <p>
+ * This implementation creates connections where the operation queue is an
+ * ArrayBlockingQueue and the read and write queues are unbounded
+ * LinkedBlockingQueues. The <code>Retry</code> FailureMode and <code>
+ * KetamaHash</code> VBucket hashing mechanism are always used. If other
+ * configurations are needed, look at the ConnectionFactoryBuilder.
+ *
+ * </p>
+ */
+public class MembaseConnectionFactory extends BinaryConnectionFactory {
+
+ /**
+ * Default failure mode.
+ */
+ public static final FailureMode DEFAULT_FAILURE_MODE =
+ FailureMode.Retry;
+
+ /**
+ * Default hash algorithm.
+ */
+ public static final HashAlgorithm DEFAULT_HASH = HashAlgorithm.KETAMA_HASH;
+
+ /**
+ * Maximum length of the operation queue returned by this connection
+ * factory.
+ */
+ public static final int DEFAULT_OP_QUEUE_LEN=16384;
+
+ private final Locator locator;
+ private final AuthDescriptor ad;
+ private final ConfigurationProvider configurationProvider;
+ private final Config vbConfig;
+
+ public MembaseConnectionFactory(final List<URI> baseList,
+ final String bucketName, final String usr, final String pwd) throws IOException {
+ //ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder(cf);
+ for (URI bu : baseList) {
+ if (!bu.isAbsolute()) {
+ throw new IllegalArgumentException("The base URI must be absolute");
+ }
+ }
+ this.configurationProvider = new ConfigurationProviderHTTP(baseList, usr, pwd);
+ Bucket bucket = this.configurationProvider.getBucketConfiguration(bucketName);
+ Config config = bucket.getConfig();
+
+ if (config.getConfigType() == ConfigType.MEMBASE) {
+ locator = Locator.VBUCKET;
+ vbConfig = bucket.getConfig();
+ } else if (config.getConfigType() == ConfigType.MEMCACHE) {
+ locator = Locator.CONSISTENT;
+ vbConfig = null;
+ } else {
+ locator = null;
+ vbConfig = null;
+ throw new ConfigurationException("Bucket type not supported or JSON response unexpected");
+ }
+
+ if (!this.configurationProvider.getAnonymousAuthBucket().equals(bucketName) && usr != null) {
+ ad = new AuthDescriptor(new String[]{"PLAIN"},new PlainCallbackHandler(usr, pwd));
+ } else {
+ ad = null;
+ }
+ }
+
+ @Override
+ public NodeLocator createLocator(List<MemcachedNode> nodes) {
+ switch(locator) {
+ case CONSISTENT:
+ return new KetamaNodeLocator(nodes, getHashAlg());
+ case VBUCKET:
+ return new VBucketNodeLocator(nodes, getVBucketConfig());
+ default: throw new IllegalStateException(
+ "Unhandled locator type: " + locator);
+ }
+ }
+
+ public AuthDescriptor getAuthDescriptor() {
+ return ad;
+ }
+
+ public Config getVBucketConfig() {
+ return vbConfig;
+ }
+
+ public ConfigurationProvider getConfigurationProvider() {
+ return configurationProvider;
+ }
+
+ public Locator getLocator() {
+ return locator;
+ }
+}
View
321 src/main/java/net/spy/memcached/MemcachedClient.java
@@ -5,7 +5,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.URI;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList;
@@ -29,7 +28,6 @@
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
-import net.spy.memcached.auth.PlainCallbackHandler;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.BulkGetFuture;
@@ -42,7 +40,6 @@
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.GetAndTouchOperation;
import net.spy.memcached.ops.GetOperation;
-import net.spy.memcached.ops.GetlOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.Operation;
@@ -53,13 +50,6 @@
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.transcoders.TranscodeService;
import net.spy.memcached.transcoders.Transcoder;
-import net.spy.memcached.vbucket.ConfigurationException;
-import net.spy.memcached.vbucket.ConfigurationProvider;
-import net.spy.memcached.vbucket.ConfigurationProviderHTTP;
-import net.spy.memcached.vbucket.Reconfigurable;
-import net.spy.memcached.vbucket.config.Bucket;
-import net.spy.memcached.vbucket.config.Config;
-import net.spy.memcached.vbucket.config.ConfigType;
/**
* Client to a memcached server.
@@ -112,25 +102,25 @@
* </pre>
*/
public class MemcachedClient extends SpyThread
- implements MemcachedClientIF, ConnectionObserver, Reconfigurable {
+ implements MemcachedClientIF, ConnectionObserver {
- private volatile boolean running=true;
+ protected volatile boolean running=true;
private volatile boolean shuttingDown=false;
- private final long operationTimeout;
+ protected final long operationTimeout;
- private final MemcachedConnection conn;
- final OperationFactory opFact;
+ protected final MemcachedConnection conn;
+ protected final OperationFactory opFact;
- final Transcoder<Object> transcoder;
+ protected final Transcoder<Object> transcoder;
- final TranscodeService tcService;
+ protected final TranscodeService tcService;
- final AuthDescriptor authDescriptor;
+ protected final AuthDescriptor authDescriptor;
- private final AuthThreadMonitor authMonitor = new AuthThreadMonitor();
- private volatile boolean reconfiguring = false;
- private ConfigurationProvider configurationProvider;
+ protected final ConnectionFactory connFactory;
+
+ protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor();
/**
* Get a memcache client operating on the specified memcached locations.
@@ -162,128 +152,11 @@ public MemcachedClient(List<InetSocketAddress> addrs)
*/
public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)
throws IOException {
- if(cf == null) {
- throw new NullPointerException("Connection factory required");
- }
- if(addrs == null) {
- throw new NullPointerException("Server list required");
- }
- if(addrs.isEmpty()) {
- throw new IllegalArgumentException(
- "You must have at least one server to connect to");
- }
- if(cf.getOperationTimeout() <= 0) {
- throw new IllegalArgumentException(
- "Operation timeout must be positive.");
- }
- tcService = new TranscodeService(cf.isDaemon());
- transcoder=cf.getDefaultTranscoder();
- opFact=cf.getOperationFactory();
- assert opFact != null : "Connection factory failed to make op factory";
- conn=cf.createConnection(addrs);
- assert conn != null : "Connection factory failed to make a connection";
- operationTimeout = cf.getOperationTimeout();
- authDescriptor = cf.getAuthDescriptor();
- if(authDescriptor != null) {
- addObserver(this);
- }
- setName("Memcached IO over " + conn);
- setDaemon(cf.isDaemon());
- start();
- }
-
- /**
- * Get a MemcachedClient based on the REST response from a Membase server
- * where the username is different than the bucket name.
- *
- * To connect to the "default" special bucket for a given cluster, use an
- * empty string as the password.
- *
- * If a password has not been assigned to the bucket, it is typically an
- * empty string.
- *
- * @param baseList the URI list of one or more servers from the cluster
- * @param bucketName the bucket name in the cluster you wish to use
- * @param usr the username for the bucket; this nearly always be the same
- * as the bucket name
- * @param pwd the password for the bucket
- * @throws IOException if connections could not be made
- * @throws ConfigurationException if the configuration provided by the
- * server has issues or is not compatible
- */
- public MemcachedClient(final List<URI> baseList, final String bucketName,
- final String usr, final String pwd) throws IOException, ConfigurationException {
- this(new BinaryConnectionFactory(), baseList, bucketName, usr, pwd);
+ this(cf, addrs, true);
}
- /**
- * Get a MemcachedClient based on the REST response from a Membase server
- * where the username is different than the bucket name.
- *
- * Note that when specifying a ConnectionFactory you must specify a
- * BinaryConnectionFactory. Also the ConnectionFactory's protocol
- * and locator values are always overwritten. The protocol will always
- * be binary and the locator will be chosen based on the bucket type you
- * are connecting to.
- *
- * To connect to the "default" special bucket for a given cluster, use an
- * empty string as the password.
- *
- * If a password has not been assigned to the bucket, it is typically an
- * empty string.
- *
- * @param cf the ConnectionFactory to use to create connections
- * @param baseList the URI list of one or more servers from the cluster
- * @param bucketName the bucket name in the cluster you wish to use
- * @param usr the username for the bucket; this nearly always be the same
- * as the bucket name
- * @param pwd the password for the bucket
- * @throws IOException if connections could not be made
- * @throws ConfigurationException if the configuration provided by the
- * server has issues or is not compatible
- */
- public MemcachedClient(ConnectionFactory cf, final List<URI> baseList,
- final String bucketName, final String usr, final String pwd)
- throws IOException, ConfigurationException {
- ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder(cf);
- for (URI bu : baseList) {
- if (!bu.isAbsolute()) {
- throw new IllegalArgumentException("The base URI must be absolute");
- }
- }
- this.configurationProvider = new ConfigurationProviderHTTP(baseList, usr, pwd);
- Bucket bucket = this.configurationProvider.getBucketConfiguration(bucketName);
- Config config = bucket.getConfig();
-
- if (cf != null && !(cf instanceof BinaryConnectionFactory)) {
- throw new IllegalArgumentException("ConnectionFactory must be of type " +
- "BinaryConnectionFactory");
- }
-
- if (config.getConfigType() == ConfigType.MEMBASE) {
- cfb.setFailureMode(FailureMode.Retry)
- .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
- .setHashAlg(HashAlgorithm.KETAMA_HASH)
- .setLocatorType(ConnectionFactoryBuilder.Locator.VBUCKET)
- .setVBucketConfig(bucket.getConfig());
- } else if (config.getConfigType() == ConfigType.MEMCACHE) {
- cfb.setFailureMode(FailureMode.Retry)
- .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
- .setHashAlg(HashAlgorithm.KETAMA_HASH)
- .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT);
- } else {
- throw new ConfigurationException("Bucket type not supported or JSON response unexpected");
- }
-
- if (!this.configurationProvider.getAnonymousAuthBucket().equals(bucketName) && usr != null) {
- AuthDescriptor ad = new AuthDescriptor(new String[]{"PLAIN"},
- new PlainCallbackHandler(usr, pwd));
- cfb.setAuthDescriptor(ad);
- }
-
- cf = cfb.build();
-
- List<InetSocketAddress> addrs = AddrUtil.getAddresses(bucket.getConfig().getServers());
+ protected MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
+ boolean startIOThread) throws IOException {
if(cf == null) {
throw new NullPointerException("Connection factory required");
}
@@ -292,12 +165,13 @@ public MemcachedClient(ConnectionFactory cf, final List<URI> baseList,
}
if(addrs.isEmpty()) {
throw new IllegalArgumentException(
- "You must have at least one server to connect to");
+ "You must have at least one server to connect to");
}
if(cf.getOperationTimeout() <= 0) {
throw new IllegalArgumentException(
"Operation timeout must be positive.");
}
+ connFactory = cf;
tcService = new TranscodeService(cf.isDaemon());
transcoder=cf.getDefaultTranscoder();
opFact=cf.getOperationFactory();
@@ -311,42 +185,8 @@ public MemcachedClient(ConnectionFactory cf, final List<URI> baseList,
}
setName("Memcached IO over " + conn);
setDaemon(cf.isDaemon());
- this.configurationProvider.subscribe(bucketName, this);
- start();
- }
-
- /**
- * Get a MemcachedClient based on the REST response from a Membase server.
- *
- * This constructor is merely a convenience for situations where the bucket
- * name is the same as the user name. This is commonly the case.
- *
- * To connect to the "default" special bucket for a given cluster, use an
- * empty string as the password.
- *
- * If a password has not been assigned to the bucket, it is typically an
- * empty string.
- *
- * @param baseList the URI list of one or more servers from the cluster
- * @param bucketName the bucket name in the cluster you wish to use
- * @param pwd the password for the bucket
- * @throws IOException if connections could not be made
- * @throws ConfigurationException if the configuration provided by the
- * server has issues or is not compatible
- */
- public MemcachedClient(List<URI> baseList,
- String bucketName,
- String pwd) throws IOException, ConfigurationException {
- this(baseList, bucketName, bucketName, pwd);
- }
- public void reconfigure(Bucket bucket) {
- reconfiguring = true;
- try {
- conn.reconfigure(bucket);
- } catch (IllegalArgumentException ex) {
- getLogger().warn("Failed to reconfigure client, staying with previous configuration.", ex);
- } finally {
- reconfiguring = false;
+ if (startIOThread) {
+ start();
}
}
@@ -1198,58 +1038,6 @@ public Object get(String key) {
}
/**
- * Gets and locks the given key asynchronously. By default the maximum allowed
- * timeout is 30 seconds. Timeouts greater than this will be set to 30 seconds.
- *
- * @param key the key to fetch and lock
- * @param exp the amount of time the lock should be valid for in seconds.
- * @param tc the transcoder to serialize and unserialize value
- * @return a future that will hold the return value of the fetch
- * @throws IllegalStateException in the rare circumstance where queue
- * is too full to accept any more requests
- */
- public <T> OperationFuture<CASValue<T>> asyncGetAndLock(final String key, int exp,
- final Transcoder<T> tc) {
- final CountDownLatch latch=new CountDownLatch(1);
- final OperationFuture<CASValue<T>> rv=
- new OperationFuture<CASValue<T>>(key, latch, operationTimeout);
-
- Operation op=opFact.getl(key, exp,
- new GetlOperation.Callback() {
- private CASValue<T> val=null;
- public void receivedStatus(OperationStatus status) {
- rv.set(val, status);
- }
- public void gotData(String k, int flags, long cas, byte[] data) {
- assert key.equals(k) : "Wrong key returned";
- assert cas > 0 : "CAS was less than zero: " + cas;
- val=new CASValue<T>(cas, tc.decode(
- new CachedData(flags, data, tc.getMaxSize())));
- }
- public void complete() {
- latch.countDown();
- }});
- rv.setOperation(op);
- addOp(key, op);
- return rv;
- }
-
- /**
- * Get and lock the given key asynchronously and decode with the default
- * transcoder. By default the maximum allowed timeout is 30 seconds.
- * Timeouts greater than this will be set to 30 seconds.
- *
- * @param key the key to fetch and lock
- * @param exp the amount of time the lock should be valid for in seconds.
- * @return a future that will hold the return value of the fetch
- * @throws IllegalStateException in the rare circumstance where queue
- * is too full to accept any more requests
- */
- public OperationFuture<CASValue<Object>> asyncGetAndLock(final String key, int exp) {
- return asyncGetAndLock(key, exp, transcoder);
- }
-
- /**
* Asynchronously get a bunch of objects from the cache.
*
* @param <T>
@@ -1632,48 +1420,6 @@ public void complete() {
}
/**
- * Getl with a single key. By default the maximum allowed timeout is 30
- * seconds. Timeouts greater than this will be set to 30 seconds.
- *
- * @param key the key to get and lock
- * @param exp the amount of time the lock should be valid for in seconds.
- * @param tc the transcoder to serialize and unserialize value
- * @return the result from the cache (null if there is none)
- * @throws OperationTimeoutException if the global operation timeout is
- * exceeded
- * @throws IllegalStateException in the rare circumstance where queue
- * is too full to accept any more requests
- */
- public <T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc) {
- try {
- return asyncGetAndLock(key, exp, tc).get(
- operationTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted waiting for value", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("Exception waiting for value", e);
- } catch (TimeoutException e) {
- throw new OperationTimeoutException("Timeout waiting for value", e);
- }
- }
-
- /**
- * Get and lock with a single key and decode using the default transcoder.
- * By default the maximum allowed timeout is 30 seconds. Timeouts greater
- * than this will be set to 30 seconds.
- * @param key the key to get and lock
- * @param exp the amount of time the lock should be valid for in seconds.
- * @return the result from the cache (null if there is none)
- * @throws OperationTimeoutException if the global operation timeout is
- * exceeded
- * @throws IllegalStateException in the rare circumstance where queue
- * is too full to accept any more requests
- */
- public CASValue<Object> getAndLock(String key, int exp) {
- return getAndLock(key, exp, transcoder);
- }
-
- /**
* Increment the given key by the given amount.
*
* Due to the way the memcached server operates on items, incremented
@@ -2009,7 +1755,7 @@ public void complete() {
return rv.keySet();
}
- private void logRunException(Exception e) {
+ protected void logRunException(Exception e) {
if(shuttingDown) {
// There are a couple types of errors that occur during the
// shutdown sequence that are considered OK. Log at debug.
@@ -2025,19 +1771,17 @@ private void logRunException(Exception e) {
@Override
public void run() {
while(running) {
- if (!reconfiguring) {
- try {
- conn.handleIO();
- } catch (IOException e) {
- logRunException(e);
- } catch (CancelledKeyException e) {
- logRunException(e);
- } catch (ClosedSelectorException e) {
- logRunException(e);
- } catch (IllegalStateException e) {
- logRunException(e);
- }
- }
+ try {
+ conn.handleIO();
+ } catch (IOException e) {
+ logRunException(e);
+ } catch (CancelledKeyException e) {
+ logRunException(e);
+ } catch (ClosedSelectorException e) {
+ logRunException(e);
+ } catch (IllegalStateException e) {
+ logRunException(e);
+ }
}
getLogger().info("Shut down memcached client");
}
@@ -2080,11 +1824,8 @@ public boolean shutdown(long timeout, TimeUnit unit) {
conn.shutdown();
setName(baseName + " - SHUTTING DOWN (informed client)");
tcService.shutdown();
- if (configurationProvider != null) {
- configurationProvider.shutdown();
- }
} catch (IOException e) {
- getLogger().warn("exception while shutting down configuration provider", e);
+ getLogger().warn("exception while shutting down", e);
}
}
return rv;
View
9 src/main/java/net/spy/memcached/MemcachedClientIF.java
@@ -66,15 +66,6 @@ CASResponse cas(String key, long casId, Object value)
Future<Object> asyncGet(String key);
- Future<CASValue<Object>> asyncGetAndLock(final String key, int exp);
-
- <T> Future<CASValue<T>> asyncGetAndLock(final String key, int exp,
- final Transcoder<T> tc);
-
- CASValue<Object> getAndLock(String key, int exp);
-
- <T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc);
-
Future<CASValue<Object>> asyncGetAndTouch(final String key, final int exp);
<T> Future<CASValue<T>> asyncGetAndTouch(final String key, final int exp,
View
16 src/test/java/net/spy/memcached/BinaryClientTest.java
@@ -1,6 +1,5 @@
package net.spy.memcached;
-
/**
* This test assumes a binary server is running on localhost:11211.
*/
@@ -86,4 +85,19 @@ public void testTouchTimeout() throws Exception {
assertFalse(client.touch("touchkey", 3).get().booleanValue());
}
}
+
+ @Override
+ protected void syncGetTimeoutsInitClient() throws Exception {
+ initClient(new BinaryConnectionFactory() {
+ @Override
+ public long getOperationTimeout() {
+ return 2;
+ }
+
+ @Override
+ public int getTimeoutExceptionThreshold() {
+ return 1000000;
+ }
+ });
+ }
}
View
48 src/test/java/net/spy/memcached/MembaseClientTest.java
@@ -0,0 +1,48 @@
+package net.spy.memcached;
+
+import java.net.URI;
+import java.util.Arrays;
+
+public class MembaseClientTest extends BinaryClientTest {
+ @Override
+ protected void initClient() throws Exception {
+ initClient(new MembaseConnectionFactory(Arrays.asList(URI.create("http://localhost:8091/pools")),
+ "default","default", ""));
+ }
+
+ @Override
+ protected String getExpectedVersionSource() {
+ return "localhost/127.0.0.1:11210";
+ }
+
+ @Override
+ protected void initClient(ConnectionFactory cf) throws Exception{
+ client=new MembaseClient((MembaseConnectionFactory)cf);
+ }
+
+ @Override
+ public void testAvailableServers() {
+ // MembaseClient tracks hostname and ip address of servers need to
+ // make sure the available server list is 2 * (num servers)
+ try {
+ Thread.sleep(10); // Let the client warm up
+ } catch (InterruptedException e) {
+ }
+ assert client.getAvailableServers().size() == 2;
+ }
+
+ protected void syncGetTimeoutsInitClient() throws Exception {
+ initClient(new MembaseConnectionFactory(Arrays.asList(URI.create("http://localhost:8091/pools")),
+ "default","default", "") {
+ @Override
+ public long getOperationTimeout() {
+ return 2;
+ }
+
+ @Override
+ public int getTimeoutExceptionThreshold() {
+ return 1000000;
+ }
+ });
+ }
+}
View
29 src/test/java/net/spy/memcached/ProtocolBaseCase.java
@@ -582,6 +582,20 @@ public void testGracefulShutdown() throws Exception {
}
}
+ protected void syncGetTimeoutsInitClient() throws Exception {
+ initClient(new DefaultConnectionFactory() {
+ @Override
+ public long getOperationTimeout() {
+ return 2;
+ }
+
+ @Override
+ public int getTimeoutExceptionThreshold() {
+ return 1000000;
+ }
+ });
+ }
+
public void testSyncGetTimeouts() throws Exception {
final String key="timeoutTestKey";
final String value="timeoutTestValue";
@@ -598,18 +612,7 @@ public void testSyncGetTimeouts() throws Exception {
assertTrue("Couldn't shut down within five seconds",
client.shutdown(5, TimeUnit.SECONDS));
- initClient(new DefaultConnectionFactory() {
- @Override
- public long getOperationTimeout() {
- return 2;
- }
-
- @Override
- public int getTimeoutExceptionThreshold() {
- return 1000000;
- }
- });
-
+ syncGetTimeoutsInitClient();
Thread.sleep(100); // allow connections to be established
int i = 0;
@@ -637,7 +640,7 @@ public int getTimeoutExceptionThreshold() {
}
}
- private void debugNodeInfo(Collection<MemcachedNode> nodes) {
+ protected void debugNodeInfo(Collection<MemcachedNode> nodes) {
System.err.println("Debug nodes:");
for (MemcachedNode node : nodes) {
System.err.println(node);
View
7 src/test/java/net/spy/memcached/VBucketMemcachedClientTest.java
@@ -12,15 +12,12 @@
import java.net.URI;
import java.io.IOException;
-/**
- * @author Alexander Sokolovsky
- */
public class VBucketMemcachedClientTest extends TestCase {
public void testOps() throws Exception {
- MemcachedClient mc = null;
+ MembaseClient mc = null;
try {
URI base = new URI("http://localhost:8091/pools");
- mc = new MemcachedClient(Arrays.asList(base), "default", "Administrator", "password");
+ mc = new MembaseClient(Arrays.asList(base), "default", "Administrator", "password");
} catch (IOException ex) {
Logger.getLogger(VBucketMemcachedClientTest.class.getName()).log(Level.SEVERE, null, ex);
} catch (ConfigurationException ex) {
View
4 src/test/manual/net/spy/memcached/test/TapBucketTest.java
@@ -6,7 +6,7 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MembaseClient;
import net.spy.memcached.TapClient;
import net.spy.memcached.tapmessage.ResponseMessage;
@@ -26,7 +26,7 @@
public static void main(String[] args) throws Exception {
boolean failed = false;
- MemcachedClient mc = new MemcachedClient(
+ MembaseClient mc = new MembaseClient(
Arrays.asList(new URI("http://localhost:8091/pools")),
"bucket", "bucket", "password");
TapClient tc = new TapClient(

0 comments on commit b27a441

Please sign in to comment.
Something went wrong with that request. Please try again.