Skip to content

Commit

Permalink
Added the Voldemort Client automated re-bootstrap mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman authored and Lei Gao committed Jun 27, 2012
1 parent a3e9359 commit 441a936
Show file tree
Hide file tree
Showing 15 changed files with 716 additions and 33 deletions.
31 changes: 31 additions & 0 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -45,6 +45,7 @@
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.client.SystemStore;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
Expand Down Expand Up @@ -85,6 +86,7 @@
public class VoldemortAdminTool {

private static final String ALL_METADATA = "all";
private static SystemStore<String, Long> sysStoreVersion = null;

@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -250,6 +252,13 @@ public static void main(String[] args) throws Exception {

AdminClient adminClient = new AdminClient(url, new AdminClientConfig());

// Initialize the system store for stores.xml version
String[] bootstrapUrls = new String[1];
bootstrapUrls[0] = url;
sysStoreVersion = new SystemStore<String, Long>("voldsys$_metadata_version",
bootstrapUrls,
0);

String ops = "";
if(options.has("delete-partitions")) {
ops += "d";
Expand Down Expand Up @@ -433,6 +442,10 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.STORES_KEY,
mapper.writeStoreList(storeDefs));

// Update the store metadata version
updateStoreMetadataversion();

} else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Rebalancing steal info file path incorrect");
Expand Down Expand Up @@ -722,6 +735,24 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat
}
}

/*
* TODO: For now write one version for the entire stores.xml When we split
* the stores.xml, make this more granular
*/
private static void updateStoreMetadataversion() {
String versionKey = "stores.xml";
Versioned<Long> storesVersion = sysStoreVersion.getSysStore(versionKey);
if(storesVersion == null) {
System.err.println("Current version is null. Assuming version 0.");
storesVersion = new Versioned<Long>((long) 1);
} else {
System.out.println("Version obtained = " + storesVersion.getValue());
long newValue = storesVersion.getValue() + 1;
storesVersion.setObject(newValue);
}
sysStoreVersion.putSysStore(versionKey, storesVersion);
}

private static void executeSetMetadata(Integer nodeId,
AdminClient adminClient,
String key,
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -151,7 +151,8 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
this,
3,
clientContextName,
sequencer.getAndIncrement());
sequencer.getAndIncrement(),
config);
}

@SuppressWarnings("unchecked")
Expand Down
125 changes: 125 additions & 0 deletions src/java/voldemort/client/AsyncMetadataVersionManager.java
@@ -0,0 +1,125 @@
package voldemort.client;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Random;
import java.util.concurrent.Callable;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.versioning.Versioned;

/*
* The AsyncMetadataVersionManager is used to track the Metadata version on the
* cluster and if necessary Re-bootstrap the client.
*
* During initialization, it will retrieve the current version of the store (or
* the entire stores.xml depending upon granularity) and then periodically check
* whether this has been updated. During init if the initial version turns out
* to be null, it means that no change has been done to that store since it was
* created. In this case, we assume version '0'.
*/

public class AsyncMetadataVersionManager implements Runnable {

private final Logger logger = Logger.getLogger(this.getClass());
private Versioned<Long> currentVersion;
private final SystemStore<String, Long> sysStore;
private final String systemKey = "stores.xml";
private volatile boolean isRunning;
private final Callable<Void> storeClientThunk;
private long asyncMetadataCheckInterval;

// Random delta generator
final int DELTA_MAX = 1000;
Random randomGenerator = new Random(System.currentTimeMillis());

public AsyncMetadataVersionManager(SystemStore<String, Long> systemStore,
long asyncMetadataCheckInterval,
Callable<Void> storeClientThunk) {
this(null, systemStore, asyncMetadataCheckInterval, storeClientThunk);
}

public AsyncMetadataVersionManager(Versioned<Long> initialVersion,
SystemStore<String, Long> systemStore,
long asyncMetadataCheckInterval,
Callable<Void> storeClientThunk) {
this.sysStore = systemStore;
if(initialVersion == null) {
this.currentVersion = sysStore.getSysStore("stores.xml");

// If the received store version is null, assume version 0
if(currentVersion == null)
currentVersion = new Versioned<Long>((long) 0);
} else {
currentVersion = initialVersion;
}

// Initialize and start the background check thread
isRunning = true;

Thread checkVersionThread = new Thread(this, "AsyncVersionCheckThread");
checkVersionThread.setDaemon(true);
checkVersionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

public void uncaughtException(Thread t, Throwable e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error("Uncaught exception in Metadata Version check thread:", e);
}
});

this.storeClientThunk = storeClientThunk;
this.asyncMetadataCheckInterval = asyncMetadataCheckInterval;
checkVersionThread.start();

}

public void destroy() {
isRunning = false;
}

public void run() {
while(!Thread.currentThread().isInterrupted() && isRunning) {
try {
Thread.sleep(asyncMetadataCheckInterval);
} catch(InterruptedException e) {
break;
}

Versioned<Long> newVersion = this.sysStore.getSysStore(systemKey);

// If version obtained is null, the store is untouched. Continue
if(newVersion == null) {
logger.info("Metadata unchanged after creation ...");
continue;
}

logger.info("MetadataVersion check => Obtained " + systemKey + " version : "
+ newVersion);

if(!newVersion.equals(currentVersion)) {
logger.info("Metadata version mismatch detected.");

// Determine a random delta delay between 0 to 1000 (ms)
int delta = randomGenerator.nextInt(DELTA_MAX);

try {
logger.info("Sleeping for delta : " + delta + " (ms) before re-bootstrapping.");
Thread.sleep(delta);
} catch(InterruptedException e) {
break;
}

// Invoke callback for bootstrap
try {
this.storeClientThunk.call();
} catch(Exception e) {
e.printStackTrace();
}

// Update the current version
currentVersion = newVersion;
}
}
}
}
14 changes: 14 additions & 0 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -79,6 +79,7 @@ public class ClientConfig {

private volatile int maxBootstrapRetries = 2;
private volatile String clientContextName = "default";
private volatile long asyncCheckMetadataInterval = 5000;

public ClientConfig() {}

Expand Down Expand Up @@ -120,6 +121,7 @@ public ClientConfig() {}
public static final String FAILUREDETECTOR_REQUEST_LENGTH_THRESHOLD_PROPERTY = "failuredetector_request_length_threshold";
public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries";
public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context";
public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -282,6 +284,10 @@ private void setProperties(Properties properties) {
if(props.containsKey(CLIENT_CONTEXT_NAME)) {
this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null));
}

if(props.containsKey(ASYNC_CHECK_METADATA_INTERVAL)) {
this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL, 5000));
}
}

public int getMaxConnectionsPerNode() {
Expand Down Expand Up @@ -707,4 +713,12 @@ public ClientConfig setClientContextName(String clientContextName) {
return this;
}

public long getAsyncCheckMetadataInterval() {
return asyncCheckMetadataInterval;
}

public void setAsyncCheckMetadataInterval(long asyncCheckMetadataInterval) {
this.asyncCheckMetadataInterval = asyncCheckMetadataInterval;
}

}
64 changes: 54 additions & 10 deletions src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -16,10 +16,12 @@

package voldemort.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -60,26 +62,37 @@ public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final Logger logger = Logger.getLogger(DefaultStoreClient.class);
private final StoreClientFactory storeFactory;

private final ClientConfig config;
private final int metadataRefreshAttempts;
private final String storeName;
private final InconsistencyResolver<Versioned<V>> resolver;
private volatile Store<K, V, Object> store;
private final UUID clientId;
private SystemStore<String, String> sysStore;
private final Map<String, SystemStore> sysStoreMap;
private AsyncMetadataVersionManager asyncCheckMetadata;

// Enumerate all the system stores
private final String METADATA_VERSION_STORE = "voldsys$_metadata_version";
private final String CLIENT_REGISTRY_STORE = "voldsys$_client_registry";
private final String STORE_DEFINITION_STORE = "voldsys$_client_store_definition";
private final String[] systemStoreNames = { METADATA_VERSION_STORE, CLIENT_REGISTRY_STORE,
STORE_DEFINITION_STORE };

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts) {
this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0);
this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null);
}

@SuppressWarnings("unchecked")
public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts,
String clientContext,
int clientSequence) {
int clientSequence,
ClientConfig config) {

this.storeName = Utils.notNull(storeName);
this.resolver = resolver;
Expand All @@ -88,6 +101,8 @@ public DefaultStoreClient(String storeName,
this.clientId = AbstractStoreClientFactory.generateClientId(storeName,
clientContext,
clientSequence);
this.config = config;

// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
Expand All @@ -96,21 +111,50 @@ public DefaultStoreClient(String storeName,
+ storeName + "."
+ clientId.toString()));
bootStrap();

// Initialize all the system stores
sysStoreMap = new HashMap<String, SystemStore>();
initializeSystemStores();

// Initialize the background thread for checking metadata version
if(config != null) {
SystemStore versionStore = this.sysStoreMap.get(METADATA_VERSION_STORE);
if(versionStore == null)
logger.info("Metadata version system store not found. Cannot run Metadata version check thread.");
else {
Callable<Void> bootstrapCallback = new Callable<Void>() {

public Void call() throws Exception {
bootStrap();
return null;
}
};

asyncCheckMetadata = new AsyncMetadataVersionManager(versionStore,
config.getAsyncCheckMetadataInterval(),
bootstrapCallback);
logger.info("Metadata version check thread started. Frequency = Every "
+ config.getAsyncCheckMetadataInterval() + " ms");
}
}

logger.info("Voldemort client created: clientContext=" + clientContext + " clientSequence="
+ clientSequence + " clientId=" + clientId.toString());
}

public void initializeSystemStores() {
for(String storeName: systemStoreNames) {
SystemStore<String, Long> sysStore = new SystemStore<String, Long>(storeName,
config.getBootstrapUrls(),
config.getClientZoneId());
this.sysStoreMap.put(storeName, sysStore);
}
}

@JmxOperation(description = "bootstrap metadata from the cluster.")
public void bootStrap() {
logger.info("Bootstrapping metadata for store " + this.storeName);
this.store = storeFactory.getRawStore(storeName, resolver, clientId);

logger.info("Creating System store");
String systemKey = storeName + "-client";
this.sysStore = new SystemStore<String, String>("voldsys$_client_registry",
this.storeFactory);
sysStore.putSysStore(systemKey, "Registered");
logger.info("Getting value - " + sysStore.getSysStore(systemKey));
}

public boolean delete(K key) {
Expand Down

0 comments on commit 441a936

Please sign in to comment.