Skip to content

Commit

Permalink
reimplement proxy put based on zone n-ary replicas logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed May 8, 2013
1 parent 73802e0 commit 82f97c4
Show file tree
Hide file tree
Showing 14 changed files with 703 additions and 171 deletions.
31 changes: 21 additions & 10 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -100,8 +100,6 @@
public class VoldemortAdminTool {

private static final String ALL_METADATA = "all";
private static final String STORES_VERSION_KEY = "stores.xml";
private static final String CLUSTER_VERSION_KEY = "cluster.xml";

@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -169,7 +167,8 @@ public static void main(String[] args) throws Exception {
parser.accepts("check-metadata",
"retreive metadata information from all nodes and checks if they are consistent across [ "
+ MetadataStore.CLUSTER_KEY + " | " + MetadataStore.STORES_KEY
+ " | " + MetadataStore.SERVER_STATE_KEY + " ]")
+ " | " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + " | "
+ MetadataStore.SERVER_STATE_KEY + " ]")
.withRequiredArg()
.describedAs("metadata-key")
.ofType(String.class);
Expand All @@ -185,13 +184,15 @@ public static void main(String[] args) throws Exception {
parser.accepts("set-metadata",
"Forceful setting of metadata [ " + MetadataStore.CLUSTER_KEY + " | "
+ MetadataStore.STORES_KEY + " | " + MetadataStore.SERVER_STATE_KEY
+ " | " + MetadataStore.REBALANCING_STEAL_INFO + " ]")
+ " | " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + " | "
+ MetadataStore.REBALANCING_STEAL_INFO + " ]")
.withRequiredArg()
.describedAs("metadata-key")
.ofType(String.class);
parser.accepts("set-metadata-value",
"The value for the set-metadata [ " + MetadataStore.CLUSTER_KEY + " | "
+ MetadataStore.STORES_KEY + ", "
+ MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", "
+ MetadataStore.REBALANCING_STEAL_INFO
+ " ] - xml file location, [ " + MetadataStore.SERVER_STATE_KEY
+ " ] - " + MetadataStore.VoldemortState.NORMAL_SERVER + ","
Expand Down Expand Up @@ -491,14 +492,15 @@ public static void main(String[] args) throws Exception {
throw new VoldemortException("Missing set-metadata-value");
} else {
String metadataValue = (String) options.valueOf("set-metadata-value");
if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0) {
if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0
|| metadataKey.compareTo(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Cluster xml file path incorrect");
ClusterMapper mapper = new ClusterMapper();
Cluster newCluster = mapper.readCluster(new File(metadataValue));
executeSetMetadata(nodeId,
adminClient,
MetadataStore.CLUSTER_KEY,
metadataKey,
mapper.writeCluster(newCluster));
} else if(metadataKey.compareTo(MetadataStore.SERVER_STATE_KEY) == 0) {
VoldemortState newState = VoldemortState.valueOf(metadataValue);
Expand Down Expand Up @@ -763,21 +765,27 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t5) Set metadata on all nodes");
stream.println("\t\t./bin/voldemort-admin-tool.sh --set-metadata ["
+ MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", "
+ MetadataStore.STORES_KEY + ", " + MetadataStore.REBALANCING_STEAL_INFO
+ MetadataStore.STORES_KEY + ", "
+ MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", "
+ MetadataStore.REBALANCING_STEAL_INFO
+ "] --set-metadata-value [metadata-value] --url [url]");
stream.println("\t6) Set metadata for a particular node");
stream.println("\t\t./bin/voldemort-admin-tool.sh --set-metadata ["
+ MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", "
+ MetadataStore.STORES_KEY + ", " + MetadataStore.REBALANCING_STEAL_INFO
+ MetadataStore.STORES_KEY + ", "
+ MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", "
+ MetadataStore.REBALANCING_STEAL_INFO
+ "] --set-metadata-value [metadata-value] --url [url] --node [node-id]");
stream.println("\t7) Check if metadata is same on all nodes");
stream.println("\t\t./bin/voldemort-admin-tool.sh --check-metadata ["
+ MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", "
+ MetadataStore.STORES_KEY + "] --url [url]");
stream.println("\t8) Clear rebalancing metadata [" + MetadataStore.SERVER_STATE_KEY + ", "
+ ", " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", "
+ MetadataStore.REBALANCING_STEAL_INFO + "] on all node ");
stream.println("\t\t./bin/voldemort-admin-tool.sh --clear-rebalancing-metadata --url [url]");
stream.println("\t9) Clear rebalancing metadata [" + MetadataStore.SERVER_STATE_KEY + ", "
+ ", " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", "
+ MetadataStore.REBALANCING_STEAL_INFO + "] on a particular node ");
stream.println("\t\t./bin/voldemort-admin-tool.sh --clear-rebalancing-metadata --url [url] --node [node-id]");
stream.println();
Expand Down Expand Up @@ -924,7 +932,9 @@ private static void executeClearRebalancing(int nodeId, AdminClient adminClient)
adminClient,
MetadataStore.REBALANCING_STEAL_INFO,
state.toJsonString());

System.out.println("Cleaning up " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML
+ " to empty string");
executeSetMetadata(nodeId, adminClient, MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, "");
}

private static void executeKeyDistribution(AdminClient adminClient) {
Expand All @@ -951,7 +961,8 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat
+ " was null");
} else {

if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0) {
if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0
|| metadataKey.compareTo(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML) == 0) {
metadataValues.add(new ClusterMapper().readCluster(new StringReader(versioned.getValue())));
} else if(metadataKey.compareTo(MetadataStore.STORES_KEY) == 0) {
metadataValues.add(new StoreDefinitionsMapper().readStoreList(new StringReader(versioned.getValue())));
Expand Down
8 changes: 4 additions & 4 deletions src/java/voldemort/routing/StoreRoutingPlan.java
Expand Up @@ -201,7 +201,7 @@ public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) {
return zoneReplicaType;
}
}
if(zoneReplicaType > 0) {
if(zoneReplicaType > -1) {
throw new VoldemortException("Node " + nodeId + " not a replica for the key "
+ ByteUtils.toHexString(key) + " in given zone " + zoneId);
} else {
Expand Down Expand Up @@ -233,14 +233,14 @@ public int getZoneReplicaNode(int zoneId, int zoneReplicaType, byte[] key) {
return node.getId();
}
}
if(zoneReplicaTypeCounter == 0) {
if(zoneReplicaTypeCounter == -1) {
throw new VoldemortException("Could not find any replicas for the key "
+ ByteUtils.toHexString(key) + " in given zone " + zoneId);
} else {
throw new VoldemortException("Could not find " + zoneReplicaType
throw new VoldemortException("Could not find " + (zoneReplicaType + 1)
+ " replicas for the key " + ByteUtils.toHexString(key)
+ " in given zone " + zoneId + ". Only found "
+ zoneReplicaTypeCounter);
+ (zoneReplicaTypeCounter + 1));
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -223,6 +223,7 @@ public class VoldemortConfig implements Serializable {
private int maxParallelStoresRebalancing;
private boolean rebalancingOptimization;
private boolean usePartitionScanForRebalance;
private int maxProxyPutThreads;
@Deprecated
// Should be removed once the proxy put implementation is stable.
private boolean proxyPutsDuringRebalance;
Expand Down Expand Up @@ -465,6 +466,7 @@ public VoldemortConfig(Props props) {
this.rebalancingOptimization = props.getBoolean("rebalancing.optimization", true);
this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance",
true);
this.maxProxyPutThreads = props.getInt("max.proxy.put.threads", 1);
this.proxyPutsDuringRebalance = props.getBoolean("proxy.puts.during.rebalance", false);

this.failureDetectorImplementation = props.getString("failuredetector.implementation",
Expand Down Expand Up @@ -2668,6 +2670,22 @@ public boolean usePartitionScanForRebalance() {
return usePartitionScanForRebalance;
}

/**
* Total number of threads needed to issue proxy puts during rebalancing
*
* <ul>
* <li>Property :"max.proxy.put.threads"</li>
* <li>Default : 1</li>
* </ul>
*/
public void setMaxProxyPutThreads(int maxProxyPutThreads) {
this.maxProxyPutThreads = maxProxyPutThreads;
}

public int getMaxProxyPutThreads() {
return this.maxProxyPutThreads;
}

/**
* If set to true, the puts to the new replicas will be relayed back to the
* original donor nodes, such that they exist if rebalance were to abort in
Expand Down
39 changes: 28 additions & 11 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -159,11 +159,14 @@ public void rebalanceStateChange(Cluster cluster,
List<RebalancePartitionsInfo> completedRebalancePartitionsInfo = Lists.newArrayList();
List<String> swappedStoreNames = Lists.newArrayList();
boolean completedClusterChange = false;
boolean completedRebalanceSourceClusterChange = false;
Cluster previousRebalancingSourceCluster = null;

try {
// CHANGE CLUSTER METADATA
if(changeClusterMetadata) {
changeCluster(cluster);
logger.info("Switching metadata from " + currentCluster + " to " + cluster);
changeCluster(MetadataStore.CLUSTER_KEY, cluster);
completedClusterChange = true;
}

Expand All @@ -175,12 +178,21 @@ public void rebalanceStateChange(Cluster cluster,
// CHANGE REBALANCING STATE
if(changeRebalanceState) {
try {
previousRebalancingSourceCluster = metadataStore.getRebalancingSourceCluster();
if(!rollback) {
// Save up the current cluster for Redirecting store
changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, currentCluster);
completedRebalanceSourceClusterChange = true;

for(RebalancePartitionsInfo info: rebalancePartitionsInfo) {
metadataStore.addRebalancingState(info);
completedRebalancePartitionsInfo.add(info);
}
} else {
// Reset the rebalancing source cluster back to null
changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, null);
completedRebalanceSourceClusterChange = true;

for(RebalancePartitionsInfo info: rebalancePartitionsInfo) {
metadataStore.deleteRebalancingState(info);
completedRebalancePartitionsInfo.add(info);
Expand All @@ -197,7 +209,8 @@ public void rebalanceStateChange(Cluster cluster,
// ROLLBACK CLUSTER CHANGE
if(completedClusterChange) {
try {
changeCluster(currentCluster);
logger.info("Rolling back cluster.xml to " + currentCluster);
changeCluster(MetadataStore.CLUSTER_KEY, currentCluster);
} catch(Exception exception) {
logger.error("Error while rolling back cluster metadata to " + currentCluster,
exception);
Expand All @@ -215,7 +228,6 @@ public void rebalanceStateChange(Cluster cluster,

// CHANGE BACK ALL REBALANCING STATES FOR COMPLETED ONES
if(completedRebalancePartitionsInfo.size() > 0) {

if(!rollback) {
for(RebalancePartitionsInfo info: completedRebalancePartitionsInfo) {
try {
Expand All @@ -240,6 +252,14 @@ public void rebalanceStateChange(Cluster cluster,

}

// Revert changes to REBALANCING_SOURCE_CLUSTER_XML
if(completedRebalanceSourceClusterChange) {
logger.info("Reverting the REBALANCING_SOURCE_CLUSTER_XML back to "
+ previousRebalancingSourceCluster);
changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML,
previousRebalancingSourceCluster);
}

throw e;
}

Expand Down Expand Up @@ -293,24 +313,21 @@ private void swapROStores(List<String> swappedStoreNames, boolean useSwappedStor
*
* @param cluster The cluster metadata information
*/
private void changeCluster(final Cluster cluster) {
private void changeCluster(String clusterKey, final Cluster cluster) {
try {
metadataStore.writeLock.lock();
try {
VectorClock updatedVectorClock = ((VectorClock) metadataStore.get(MetadataStore.CLUSTER_KEY,
null)
// TODO why increment server 0 all the time?
VectorClock updatedVectorClock = ((VectorClock) metadataStore.get(clusterKey, null)
.get(0)
.getVersion()).incremented(0,
System.currentTimeMillis());
logger.info("Switching metadata from " + metadataStore.getCluster() + " to "
+ cluster + " [ " + updatedVectorClock + " ]");
metadataStore.put(MetadataStore.CLUSTER_KEY,
Versioned.value((Object) cluster, updatedVectorClock));
metadataStore.put(clusterKey, Versioned.value((Object) cluster, updatedVectorClock));
} finally {
metadataStore.writeLock.unlock();
}
} catch(Exception e) {
logger.info("Error while changing cluster to " + cluster);
logger.info("Error while changing cluster to " + cluster + "for key " + clusterKey);
throw new VoldemortException(e);
}
}
Expand Down
26 changes: 25 additions & 1 deletion src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -77,6 +77,7 @@
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.rebalancing.ProxyPutStats;
import voldemort.store.rebalancing.RebootstrappingStore;
import voldemort.store.rebalancing.RedirectingStore;
import voldemort.store.retention.RetentionEnforcingStore;
Expand All @@ -96,6 +97,7 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.ConfigurationException;
import voldemort.utils.DaemonThreadFactory;
import voldemort.utils.DynamicThrottleLimit;
import voldemort.utils.EventThrottler;
import voldemort.utils.JmxUtils;
Expand Down Expand Up @@ -137,6 +139,8 @@ public class StorageService extends AbstractService {
private final FailureDetector failureDetector;
private final StoreStats storeStats;
private final RoutedStoreFactory routedStoreFactory;
private final ExecutorService proxyPutWorkerPool;
private final ProxyPutStats aggregatedProxyPutStats;

public StorageService(StoreRepository storeRepository,
MetadataStore metadata,
Expand Down Expand Up @@ -179,6 +183,17 @@ public StorageService(StoreRepository storeRepository,
this.dynThrottleLimit = new DynamicThrottleLimit(rate);
} else
this.dynThrottleLimit = null;

// create the proxy put thread pool
this.proxyPutWorkerPool = Executors.newFixedThreadPool(config.getMaxProxyPutThreads(),
new DaemonThreadFactory("voldemort-proxy-put-thread"));
this.aggregatedProxyPutStats = new ProxyPutStats(null);
if(config.isJmxEnabled()) {
JmxUtils.registerMbean(this.aggregatedProxyPutStats,
JmxUtils.createObjectName("voldemort.store.rebalancing",
"aggregate-proxy-puts"));
}

}

private void initStorageConfig(String configClassName) {
Expand Down Expand Up @@ -752,12 +767,21 @@ public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine,
}

if(voldemortConfig.isEnableRebalanceService()) {
ProxyPutStats proxyPutStats = new ProxyPutStats(aggregatedProxyPutStats);
if(voldemortConfig.isJmxEnabled()) {
JmxUtils.registerMbean(proxyPutStats,
JmxUtils.createObjectName("voldemort.store.rebalancing",
engine.getName()
+ "-proxy-puts"));
}
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory,
voldemortConfig.getProxyPutsDuringRebalance());
voldemortConfig.getProxyPutsDuringRebalance(),
proxyPutWorkerPool,
proxyPutStats);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
Expand Down

0 comments on commit 82f97c4

Please sign in to comment.