Skip to content

Commit

Permalink
Add a configuration provider for bucket updates.
Browse files Browse the repository at this point in the history
Change-Id: I202aa38d5c600f1e40febca02f303398a4600977
Reviewed-on: http://review.couchbase.org/6760
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information
bairon authored and ingenthr committed Jun 8, 2011
1 parent 6b24c54 commit 329f6e3
Show file tree
Hide file tree
Showing 14 changed files with 4,728 additions and 29 deletions.
@@ -0,0 +1,20 @@
package net.spy.memcached.vbucket;

public class ConfigurationException extends RuntimeException {

public ConfigurationException() {
super();
}

public ConfigurationException(String message) {
super(message);
}

public ConfigurationException(String message, Throwable cause) {
super(message, cause);
}

public ConfigurationException(Throwable cause) {
super(cause);
}
}
16 changes: 16 additions & 0 deletions src/main/java/net/spy/memcached/vbucket/ConfigurationProvider.java
@@ -0,0 +1,16 @@
package net.spy.memcached.vbucket;


import net.spy.memcached.vbucket.config.Bucket;

public interface ConfigurationProvider {
Bucket getBucketConfiguration(String bucketname) throws ConfigurationException;

void subscribe(String bucketName, Reconfigurable rec) throws ConfigurationException;

void unsubscribe(String bucketName, Reconfigurable rec);

void shutdown();

String getAnonymousAuthBucket();
}
235 changes: 235 additions & 0 deletions src/main/java/net/spy/memcached/vbucket/ConfigurationProviderHTTP.java
@@ -0,0 +1,235 @@
package net.spy.memcached.vbucket;

import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.net.URI;
import java.net.InetSocketAddress;
import java.net.URLConnection;
import java.net.HttpURLConnection;
import java.net.Authenticator;
import java.net.URL;
import java.io.IOException;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.text.ParseException;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.vbucket.config.Bucket;
import net.spy.memcached.vbucket.config.Config;
import net.spy.memcached.vbucket.config.Pool;
import net.spy.memcached.vbucket.config.ConfigurationParserJSON;
import net.spy.memcached.vbucket.config.ConfigurationParser;

public class ConfigurationProviderHTTP extends SpyObject implements ConfigurationProvider {
private static final String DEFAULT_POOL_NAME = "default";
private static final String ANONYMOUS_AUTH_BUCKET = "default";
/**
* The specification version which this client meets. This will be included
* in requests to the server.
*/
public static final String CLIENT_SPEC_VER = "1.0";
private List<URI> baseList;
private String restUsr;
private String restPwd;
private URI loadedBaseUri;
// map of <bucketname, bucket> currently loaded
private Map<String, Bucket> buckets = new ConcurrentHashMap<String, Bucket>();

// map of <poolname, pool> currently loaded
//private Map<String, Pool> pools = new ConcurrentHashMap<String, Pool>();
private ConfigurationParser configurationParser = new ConfigurationParserJSON();
private Map<String, BucketMonitor> monitors = new HashMap<String, BucketMonitor>();

public ConfigurationProviderHTTP(List<URI> baseList) throws IOException {
this(baseList, null, null);
}

public ConfigurationProviderHTTP(List<URI> baseList, String restUsr, String restPwd) throws IOException {
this.baseList = baseList;
this.restUsr = restUsr;
this.restPwd = restPwd;
}

public Bucket getBucketConfiguration(final String bucketname) throws ConfigurationException {
if (bucketname == null || bucketname.isEmpty()) {
throw new IllegalArgumentException("Bucket name can not be blank.");
}
Bucket bucket = this.buckets.get(bucketname);
if (bucket == null) {
readPools(bucketname);
}
return this.buckets.get(bucketname);
}

/**
* For a given bucket to be found, walk the URIs in the baselist until the
* bucket needed is found.
*
* @param bucketToFind
* @throws ConfigurationException
*/
private void readPools(String bucketToFind) throws ConfigurationException {
// the intent with this method is to encapsulate all of the walking of URIs
// and populating an internal object model of the configuration to one place
for (URI baseUri : baseList) {
try {
// get and parse the response from the current base uri
URLConnection baseConnection = urlConnBuilder(null, baseUri);
String base = readToString(baseConnection);
if ("".equals(base)) {
getLogger().warn("Provided URI " + baseUri + " has an empty response... skipping");
continue;
}
Map<String, Pool> pools = this.configurationParser.parseBase(base);

// check for the default pool name
if (!pools.containsKey(DEFAULT_POOL_NAME)) {
getLogger().warn("Provided URI " + baseUri + " has no default pool... skipping");
continue;
}
// load pools
for (Pool pool : pools.values()) {
URLConnection poolConnection = urlConnBuilder(baseUri, pool.getUri());
String poolString = readToString(poolConnection);
configurationParser.loadPool(pool, poolString);
URLConnection poolBucketsConnection = urlConnBuilder(baseUri, pool.getBucketsUri());
String sBuckets = readToString(poolBucketsConnection);
Map<String, Bucket> bucketsForPool = configurationParser.parseBuckets(sBuckets);
pool.replaceBuckets(bucketsForPool);

}
// did we found our bucket?
boolean bucketFound = false;
for (Pool pool : pools.values()) {
if (pool.hasBucket(bucketToFind)) {
bucketFound = true;
break;
}
}
if (bucketFound) {
for (Pool pool : pools.values()) {
for (Map.Entry<String, Bucket> bucketEntry : pool.getROBuckets().entrySet()) {
this.buckets.put(bucketEntry.getKey(), bucketEntry.getValue());
}
}
this.loadedBaseUri = baseUri;
return;
}
} catch (ParseException e) {
getLogger().warn("Provided URI " + baseUri + " has an unparsable response...skipping", e);
} catch (IOException e) {
getLogger().warn("Connection problems with URI " + baseUri + " ...skipping", e);
}
throw new ConfigurationException("Configuration for bucket " + bucketToFind + " was not found.");
}
}

public List<InetSocketAddress> getServerList(final String bucketname) throws ConfigurationException {
Bucket bucket = getBucketConfiguration(bucketname);
List<String> servers = bucket.getVbuckets().getServers();
StringBuilder serversString = new StringBuilder();
for (String server : servers) {
serversString.append(server).append(' ');
}
return AddrUtil.getAddresses(serversString.toString());
}

public void subscribe(String bucketName, Reconfigurable rec) throws ConfigurationException {

Bucket bucket = getBucketConfiguration(bucketName);

ReconfigurableObserver obs = new ReconfigurableObserver(rec);
BucketMonitor monitor = this.monitors.get(bucketName);
if (monitor == null) {
URI streamingURI = bucket.getStreamingURI();
monitor = new BucketMonitor(this.loadedBaseUri.resolve(streamingURI), bucketName, this.restUsr, this.restPwd, configurationParser);
this.monitors.put(bucketName, monitor);
monitor.addObserver(obs);
monitor.startMonitor();
} else {
monitor.addObserver(obs);
}
}

public void unsubscribe(String vbucketName, Reconfigurable rec) {
BucketMonitor monitor = this.monitors.get(vbucketName);
if (monitor != null) {
monitor.deleteObserver(new ReconfigurableObserver(rec));
}
}

public Config getLatestConfig(String bucketname) throws ConfigurationException {
Bucket bucket = getBucketConfiguration(bucketname);
return bucket.getVbuckets();
}

public String getAnonymousAuthBucket() {
return ANONYMOUS_AUTH_BUCKET;
}

public void shutdown() {
for (BucketMonitor monitor : this.monitors.values()) {
monitor.shutdown();
}
}

/**
* Create a URL which has the appropriate headers to interact with the
* service. Most exception handling is up to the caller.
*
* @param resource the URI either absolute or relative to the base for this ClientManager
* @return
* @throws java.io.IOException
*/
private URLConnection urlConnBuilder(URI base, URI resource) throws IOException {
if (!resource.isAbsolute() && base != null) {
resource = base.resolve(resource);
}
if (restUsr != null) {
Authenticator.setDefault(new PoolAuthenticator(this.restUsr, this.restPwd));
} else {
Authenticator.setDefault(null);
}
URL specURL = resource.toURL();
URLConnection connection = specURL.openConnection();
connection.setRequestProperty("Accept", "application/json");
connection.setRequestProperty("user-agent", "spymemcached vbucket client");
connection.setRequestProperty("X-memcachekv-Store-Client-Specification-Version", CLIENT_SPEC_VER);

return connection;

}

private String readToString(URLConnection connection) throws IOException {
BufferedReader reader = null;
try {
InputStream inStream = connection.getInputStream();
if (connection instanceof java.net.HttpURLConnection) {
HttpURLConnection httpConnection = (HttpURLConnection) connection;
if (httpConnection.getResponseCode() == 403) {
throw new IOException("Service does not accept the authentication credentials: "
+ httpConnection.getResponseCode() + httpConnection.getResponseMessage());
} else if (httpConnection.getResponseCode() >= 400) {
throw new IOException("Service responded with a failure code: "
+ httpConnection.getResponseCode() + httpConnection.getResponseMessage());
}
} else {
throw new IOException("Unexpected URI type encountered");
}
reader = new BufferedReader(new InputStreamReader(inStream));
String str;
StringBuilder buffer = new StringBuilder();
while ((str = reader.readLine()) != null) {
buffer.append(str);
}
return buffer.toString();
} finally {
reader.close();
}
}

}
24 changes: 24 additions & 0 deletions src/main/java/net/spy/memcached/vbucket/PoolAuthenticator.java
@@ -0,0 +1,24 @@
package net.spy.memcached.vbucket;

import java.net.PasswordAuthentication;

public class PoolAuthenticator extends java.net.Authenticator {

private final PasswordAuthentication auth;

public PoolAuthenticator(String username, String password) {
super();
if (username == null || password == null) {
throw new IllegalArgumentException("Username or Password is not defined.");
} else {
this.auth = new PasswordAuthentication(username, password.toCharArray());
}
}

@Override
protected PasswordAuthentication getPasswordAuthentication() {
return auth;
}


}
7 changes: 7 additions & 0 deletions src/main/java/net/spy/memcached/vbucket/Reconfigurable.java
@@ -0,0 +1,7 @@
package net.spy.memcached.vbucket;

import net.spy.memcached.vbucket.config.Bucket;

public interface Reconfigurable {
void reconfigure(Bucket bucket);
}
@@ -0,0 +1,35 @@
package net.spy.memcached.vbucket;

import net.spy.memcached.vbucket.config.Bucket;

import java.util.Observer;
import java.util.Observable;

public class ReconfigurableObserver implements Observer {
private final Reconfigurable rec;

public ReconfigurableObserver(Reconfigurable rec) {
this.rec = rec;
}

public void update(Observable o, Object arg) {
rec.reconfigure((Bucket) arg);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ReconfigurableObserver that = (ReconfigurableObserver) o;

if (!rec.equals(that.rec)) return false;

return true;
}

@Override
public int hashCode() {
return rec.hashCode();
}
}
Expand Up @@ -89,7 +89,7 @@ private Bucket parseBucketFromJSON(JSONObject bucketJO) throws ParseException {
String bucketname = bucketJO.get("name").toString();
String streamingUri = bucketJO.get("streamingUri").toString();
ConfigFactory cf = new DefaultConfigFactory();
Config config = cf.createConfigFromJSON(bucketJO);
Config config = cf.create(bucketJO);
List<Node> nodes = new ArrayList<Node>();
JSONArray nodesJA = bucketJO.getJSONArray("nodes");
for(int i = 0; i < nodesJA.length(); ++i) {
Expand Down
Expand Up @@ -103,10 +103,11 @@ private Config parseJSON(JSONObject jsonObject) throws JSONException {
"Number of buckets must be a power of two, > 0 and <= "
+ VBucket.MAX_BUCKETS);
}

List<String> populateServers = populateServers(servers);
List<VBucket> populateVbuckets = populateVbuckets(vbuckets);
Config config = new DefaultConfig(hashAlgorithm, serversCount,
replicasCount, vbucketsCount, populateServers(servers),
populateVbuckets(servers));
replicasCount, vbucketsCount, populateServers,
populateVbuckets);

return config;
}
Expand Down

0 comments on commit 329f6e3

Please sign in to comment.