From 82f97c46056ec8d9a24cf919a4b6f1fa9f393c4a Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 1 May 2013 18:22:00 -0700 Subject: [PATCH] reimplement proxy put based on zone n-ary replicas logic --- src/java/voldemort/VoldemortAdminTool.java | 31 +- .../voldemort/routing/StoreRoutingPlan.java | 8 +- .../voldemort/server/VoldemortConfig.java | 18 + .../server/rebalance/Rebalancer.java | 39 ++- .../server/storage/StorageService.java | 26 +- .../store/metadata/MetadataStore.java | 19 +- .../store/rebalancing/AsyncProxyPutTask.java | 115 +++++++ .../store/rebalancing/ProxyPutStats.java | 71 ++++ .../store/rebalancing/RedirectingStore.java | 321 ++++++++++-------- .../AbstractNonZonedRebalanceTest.java | 5 + .../rebalance/AbstractZonedRebalanceTest.java | 1 + .../client/rebalance/AdminRebalanceTest.java | 12 + .../store/metadata/MetadataStoreTest.java | 47 ++- .../rebalancing/RedirectingStoreTest.java | 161 ++++++++- 14 files changed, 703 insertions(+), 171 deletions(-) create mode 100644 src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java create mode 100644 src/java/voldemort/store/rebalancing/ProxyPutStats.java diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index c462150b65..5e94052dd4 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -100,8 +100,6 @@ public class VoldemortAdminTool { private static final String ALL_METADATA = "all"; - private static final String STORES_VERSION_KEY = "stores.xml"; - private static final String CLUSTER_VERSION_KEY = "cluster.xml"; @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { @@ -169,7 +167,8 @@ public static void main(String[] args) throws Exception { parser.accepts("check-metadata", "retreive metadata information from all nodes and checks if they are consistent across [ " + MetadataStore.CLUSTER_KEY + " | " + MetadataStore.STORES_KEY - + " | " + MetadataStore.SERVER_STATE_KEY + " ]") + + " | " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + " | " + + MetadataStore.SERVER_STATE_KEY + " ]") .withRequiredArg() .describedAs("metadata-key") .ofType(String.class); @@ -185,13 +184,15 @@ public static void main(String[] args) throws Exception { parser.accepts("set-metadata", "Forceful setting of metadata [ " + MetadataStore.CLUSTER_KEY + " | " + MetadataStore.STORES_KEY + " | " + MetadataStore.SERVER_STATE_KEY - + " | " + MetadataStore.REBALANCING_STEAL_INFO + " ]") + + " | " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + " | " + + MetadataStore.REBALANCING_STEAL_INFO + " ]") .withRequiredArg() .describedAs("metadata-key") .ofType(String.class); parser.accepts("set-metadata-value", "The value for the set-metadata [ " + MetadataStore.CLUSTER_KEY + " | " + MetadataStore.STORES_KEY + ", " + + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", " + MetadataStore.REBALANCING_STEAL_INFO + " ] - xml file location, [ " + MetadataStore.SERVER_STATE_KEY + " ] - " + MetadataStore.VoldemortState.NORMAL_SERVER + "," @@ -491,14 +492,15 @@ public static void main(String[] args) throws Exception { throw new VoldemortException("Missing set-metadata-value"); } else { String metadataValue = (String) options.valueOf("set-metadata-value"); - if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0) { + if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0 + || metadataKey.compareTo(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML) == 0) { if(!Utils.isReadableFile(metadataValue)) throw new VoldemortException("Cluster xml file path incorrect"); ClusterMapper mapper = new ClusterMapper(); Cluster newCluster = mapper.readCluster(new File(metadataValue)); executeSetMetadata(nodeId, adminClient, - MetadataStore.CLUSTER_KEY, + metadataKey, mapper.writeCluster(newCluster)); } else if(metadataKey.compareTo(MetadataStore.SERVER_STATE_KEY) == 0) { VoldemortState newState = VoldemortState.valueOf(metadataValue); @@ -763,21 +765,27 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE stream.println("\t5) Set metadata on all nodes"); stream.println("\t\t./bin/voldemort-admin-tool.sh --set-metadata [" + MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", " - + MetadataStore.STORES_KEY + ", " + MetadataStore.REBALANCING_STEAL_INFO + + MetadataStore.STORES_KEY + ", " + + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", " + + MetadataStore.REBALANCING_STEAL_INFO + "] --set-metadata-value [metadata-value] --url [url]"); stream.println("\t6) Set metadata for a particular node"); stream.println("\t\t./bin/voldemort-admin-tool.sh --set-metadata [" + MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", " - + MetadataStore.STORES_KEY + ", " + MetadataStore.REBALANCING_STEAL_INFO + + MetadataStore.STORES_KEY + ", " + + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", " + + MetadataStore.REBALANCING_STEAL_INFO + "] --set-metadata-value [metadata-value] --url [url] --node [node-id]"); stream.println("\t7) Check if metadata is same on all nodes"); stream.println("\t\t./bin/voldemort-admin-tool.sh --check-metadata [" + MetadataStore.CLUSTER_KEY + ", " + MetadataStore.SERVER_STATE_KEY + ", " + MetadataStore.STORES_KEY + "] --url [url]"); stream.println("\t8) Clear rebalancing metadata [" + MetadataStore.SERVER_STATE_KEY + ", " + + ", " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", " + MetadataStore.REBALANCING_STEAL_INFO + "] on all node "); stream.println("\t\t./bin/voldemort-admin-tool.sh --clear-rebalancing-metadata --url [url]"); stream.println("\t9) Clear rebalancing metadata [" + MetadataStore.SERVER_STATE_KEY + ", " + + ", " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + ", " + MetadataStore.REBALANCING_STEAL_INFO + "] on a particular node "); stream.println("\t\t./bin/voldemort-admin-tool.sh --clear-rebalancing-metadata --url [url] --node [node-id]"); stream.println(); @@ -924,7 +932,9 @@ private static void executeClearRebalancing(int nodeId, AdminClient adminClient) adminClient, MetadataStore.REBALANCING_STEAL_INFO, state.toJsonString()); - + System.out.println("Cleaning up " + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML + + " to empty string"); + executeSetMetadata(nodeId, adminClient, MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, ""); } private static void executeKeyDistribution(AdminClient adminClient) { @@ -951,7 +961,8 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat + " was null"); } else { - if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0) { + if(metadataKey.compareTo(MetadataStore.CLUSTER_KEY) == 0 + || metadataKey.compareTo(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML) == 0) { metadataValues.add(new ClusterMapper().readCluster(new StringReader(versioned.getValue()))); } else if(metadataKey.compareTo(MetadataStore.STORES_KEY) == 0) { metadataValues.add(new StoreDefinitionsMapper().readStoreList(new StringReader(versioned.getValue()))); diff --git a/src/java/voldemort/routing/StoreRoutingPlan.java b/src/java/voldemort/routing/StoreRoutingPlan.java index dc798c86db..8e890e7c31 100644 --- a/src/java/voldemort/routing/StoreRoutingPlan.java +++ b/src/java/voldemort/routing/StoreRoutingPlan.java @@ -201,7 +201,7 @@ public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) { return zoneReplicaType; } } - if(zoneReplicaType > 0) { + if(zoneReplicaType > -1) { throw new VoldemortException("Node " + nodeId + " not a replica for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId); } else { @@ -233,14 +233,14 @@ public int getZoneReplicaNode(int zoneId, int zoneReplicaType, byte[] key) { return node.getId(); } } - if(zoneReplicaTypeCounter == 0) { + if(zoneReplicaTypeCounter == -1) { throw new VoldemortException("Could not find any replicas for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId); } else { - throw new VoldemortException("Could not find " + zoneReplicaType + throw new VoldemortException("Could not find " + (zoneReplicaType + 1) + " replicas for the key " + ByteUtils.toHexString(key) + " in given zone " + zoneId + ". Only found " - + zoneReplicaTypeCounter); + + (zoneReplicaTypeCounter + 1)); } } diff --git a/src/java/voldemort/server/VoldemortConfig.java b/src/java/voldemort/server/VoldemortConfig.java index 8ca5134aad..063d3bf797 100644 --- a/src/java/voldemort/server/VoldemortConfig.java +++ b/src/java/voldemort/server/VoldemortConfig.java @@ -223,6 +223,7 @@ public class VoldemortConfig implements Serializable { private int maxParallelStoresRebalancing; private boolean rebalancingOptimization; private boolean usePartitionScanForRebalance; + private int maxProxyPutThreads; @Deprecated // Should be removed once the proxy put implementation is stable. private boolean proxyPutsDuringRebalance; @@ -465,6 +466,7 @@ public VoldemortConfig(Props props) { this.rebalancingOptimization = props.getBoolean("rebalancing.optimization", true); this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance", true); + this.maxProxyPutThreads = props.getInt("max.proxy.put.threads", 1); this.proxyPutsDuringRebalance = props.getBoolean("proxy.puts.during.rebalance", false); this.failureDetectorImplementation = props.getString("failuredetector.implementation", @@ -2668,6 +2670,22 @@ public boolean usePartitionScanForRebalance() { return usePartitionScanForRebalance; } + /** + * Total number of threads needed to issue proxy puts during rebalancing + * + * + */ + public void setMaxProxyPutThreads(int maxProxyPutThreads) { + this.maxProxyPutThreads = maxProxyPutThreads; + } + + public int getMaxProxyPutThreads() { + return this.maxProxyPutThreads; + } + /** * If set to true, the puts to the new replicas will be relayed back to the * original donor nodes, such that they exist if rebalance were to abort in diff --git a/src/java/voldemort/server/rebalance/Rebalancer.java b/src/java/voldemort/server/rebalance/Rebalancer.java index 9cc2635b1f..2c55f5f414 100644 --- a/src/java/voldemort/server/rebalance/Rebalancer.java +++ b/src/java/voldemort/server/rebalance/Rebalancer.java @@ -159,11 +159,14 @@ public void rebalanceStateChange(Cluster cluster, List completedRebalancePartitionsInfo = Lists.newArrayList(); List swappedStoreNames = Lists.newArrayList(); boolean completedClusterChange = false; + boolean completedRebalanceSourceClusterChange = false; + Cluster previousRebalancingSourceCluster = null; try { // CHANGE CLUSTER METADATA if(changeClusterMetadata) { - changeCluster(cluster); + logger.info("Switching metadata from " + currentCluster + " to " + cluster); + changeCluster(MetadataStore.CLUSTER_KEY, cluster); completedClusterChange = true; } @@ -175,12 +178,21 @@ public void rebalanceStateChange(Cluster cluster, // CHANGE REBALANCING STATE if(changeRebalanceState) { try { + previousRebalancingSourceCluster = metadataStore.getRebalancingSourceCluster(); if(!rollback) { + // Save up the current cluster for Redirecting store + changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, currentCluster); + completedRebalanceSourceClusterChange = true; + for(RebalancePartitionsInfo info: rebalancePartitionsInfo) { metadataStore.addRebalancingState(info); completedRebalancePartitionsInfo.add(info); } } else { + // Reset the rebalancing source cluster back to null + changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, null); + completedRebalanceSourceClusterChange = true; + for(RebalancePartitionsInfo info: rebalancePartitionsInfo) { metadataStore.deleteRebalancingState(info); completedRebalancePartitionsInfo.add(info); @@ -197,7 +209,8 @@ public void rebalanceStateChange(Cluster cluster, // ROLLBACK CLUSTER CHANGE if(completedClusterChange) { try { - changeCluster(currentCluster); + logger.info("Rolling back cluster.xml to " + currentCluster); + changeCluster(MetadataStore.CLUSTER_KEY, currentCluster); } catch(Exception exception) { logger.error("Error while rolling back cluster metadata to " + currentCluster, exception); @@ -215,7 +228,6 @@ public void rebalanceStateChange(Cluster cluster, // CHANGE BACK ALL REBALANCING STATES FOR COMPLETED ONES if(completedRebalancePartitionsInfo.size() > 0) { - if(!rollback) { for(RebalancePartitionsInfo info: completedRebalancePartitionsInfo) { try { @@ -240,6 +252,14 @@ public void rebalanceStateChange(Cluster cluster, } + // Revert changes to REBALANCING_SOURCE_CLUSTER_XML + if(completedRebalanceSourceClusterChange) { + logger.info("Reverting the REBALANCING_SOURCE_CLUSTER_XML back to " + + previousRebalancingSourceCluster); + changeCluster(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + previousRebalancingSourceCluster); + } + throw e; } @@ -293,24 +313,21 @@ private void swapROStores(List swappedStoreNames, boolean useSwappedStor * * @param cluster The cluster metadata information */ - private void changeCluster(final Cluster cluster) { + private void changeCluster(String clusterKey, final Cluster cluster) { try { metadataStore.writeLock.lock(); try { - VectorClock updatedVectorClock = ((VectorClock) metadataStore.get(MetadataStore.CLUSTER_KEY, - null) + // TODO why increment server 0 all the time? + VectorClock updatedVectorClock = ((VectorClock) metadataStore.get(clusterKey, null) .get(0) .getVersion()).incremented(0, System.currentTimeMillis()); - logger.info("Switching metadata from " + metadataStore.getCluster() + " to " - + cluster + " [ " + updatedVectorClock + " ]"); - metadataStore.put(MetadataStore.CLUSTER_KEY, - Versioned.value((Object) cluster, updatedVectorClock)); + metadataStore.put(clusterKey, Versioned.value((Object) cluster, updatedVectorClock)); } finally { metadataStore.writeLock.unlock(); } } catch(Exception e) { - logger.info("Error while changing cluster to " + cluster); + logger.info("Error while changing cluster to " + cluster + "for key " + clusterKey); throw new VoldemortException(e); } } diff --git a/src/java/voldemort/server/storage/StorageService.java b/src/java/voldemort/server/storage/StorageService.java index 275243ecab..5222e0f15c 100644 --- a/src/java/voldemort/server/storage/StorageService.java +++ b/src/java/voldemort/server/storage/StorageService.java @@ -77,6 +77,7 @@ import voldemort.store.nonblockingstore.NonblockingStore; import voldemort.store.readonly.ReadOnlyStorageConfiguration; import voldemort.store.readonly.ReadOnlyStorageEngine; +import voldemort.store.rebalancing.ProxyPutStats; import voldemort.store.rebalancing.RebootstrappingStore; import voldemort.store.rebalancing.RedirectingStore; import voldemort.store.retention.RetentionEnforcingStore; @@ -96,6 +97,7 @@ import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.ConfigurationException; +import voldemort.utils.DaemonThreadFactory; import voldemort.utils.DynamicThrottleLimit; import voldemort.utils.EventThrottler; import voldemort.utils.JmxUtils; @@ -137,6 +139,8 @@ public class StorageService extends AbstractService { private final FailureDetector failureDetector; private final StoreStats storeStats; private final RoutedStoreFactory routedStoreFactory; + private final ExecutorService proxyPutWorkerPool; + private final ProxyPutStats aggregatedProxyPutStats; public StorageService(StoreRepository storeRepository, MetadataStore metadata, @@ -179,6 +183,17 @@ public StorageService(StoreRepository storeRepository, this.dynThrottleLimit = new DynamicThrottleLimit(rate); } else this.dynThrottleLimit = null; + + // create the proxy put thread pool + this.proxyPutWorkerPool = Executors.newFixedThreadPool(config.getMaxProxyPutThreads(), + new DaemonThreadFactory("voldemort-proxy-put-thread")); + this.aggregatedProxyPutStats = new ProxyPutStats(null); + if(config.isJmxEnabled()) { + JmxUtils.registerMbean(this.aggregatedProxyPutStats, + JmxUtils.createObjectName("voldemort.store.rebalancing", + "aggregate-proxy-puts")); + } + } private void initStorageConfig(String configClassName) { @@ -752,12 +767,21 @@ public void registerEngine(StorageEngine engine, } if(voldemortConfig.isEnableRebalanceService()) { + ProxyPutStats proxyPutStats = new ProxyPutStats(aggregatedProxyPutStats); + if(voldemortConfig.isJmxEnabled()) { + JmxUtils.registerMbean(proxyPutStats, + JmxUtils.createObjectName("voldemort.store.rebalancing", + engine.getName() + + "-proxy-puts")); + } store = new RedirectingStore(store, metadata, storeRepository, failureDetector, storeFactory, - voldemortConfig.getProxyPutsDuringRebalance()); + voldemortConfig.getProxyPutsDuringRebalance(), + proxyPutWorkerPool, + proxyPutStats); if(voldemortConfig.isJmxEnabled()) { MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); ObjectName name = null; diff --git a/src/java/voldemort/store/metadata/MetadataStore.java b/src/java/voldemort/store/metadata/MetadataStore.java index c6f1f1b46f..c9751b94d5 100644 --- a/src/java/voldemort/store/metadata/MetadataStore.java +++ b/src/java/voldemort/store/metadata/MetadataStore.java @@ -78,6 +78,7 @@ public class MetadataStore extends AbstractStorageEngine GOSSIP_KEYS = ImmutableSet.of(CLUSTER_KEY, STORES_KEY); @@ -85,7 +86,8 @@ public class MetadataStore extends AbstractStorageEngine OPTIONAL_KEYS = ImmutableSet.of(SERVER_STATE_KEY, NODE_ID_KEY, - REBALANCING_STEAL_INFO); + REBALANCING_STEAL_INFO, + REBALANCING_SOURCE_CLUSTER_XML); public static final Set METADATA_KEYS = ImmutableSet.builder() .addAll(REQUIRED_KEYS) @@ -343,6 +345,10 @@ public RebalancerState getRebalancerState() { } } + public Cluster getRebalancingSourceCluster() { + return (Cluster) metadataCache.get(REBALANCING_SOURCE_CLUSTER_XML).getValue(); + } + /* * First check in the map of regular stores. If not present, check in the * system stores map. @@ -544,6 +550,7 @@ private void init(int nodeId) { initCache(REBALANCING_STEAL_INFO, new RebalancerState(new ArrayList())); initCache(SERVER_STATE_KEY, VoldemortState.NORMAL_SERVER.toString()); + initCache(REBALANCING_SOURCE_CLUSTER_XML, null); // set transient values updateRoutingStrategies(getCluster(), getStoreDefList()); @@ -594,7 +601,7 @@ private HashMap createRoutingStrategyMap(Cluster cluste */ @SuppressWarnings("unchecked") private Versioned convertObjectToString(String key, Versioned value) { - String valueStr = value.getValue().toString(); + String valueStr = ""; if(CLUSTER_KEY.equals(key)) { valueStr = clusterMapper.writeCluster((Cluster) value.getValue()); @@ -605,6 +612,10 @@ private Versioned convertObjectToString(String key, Versioned va valueStr = rebalancerState.toJsonString(); } else if(SERVER_STATE_KEY.equals(key) || NODE_ID_KEY.equals(key)) { valueStr = value.getValue().toString(); + } else if(REBALANCING_SOURCE_CLUSTER_XML.equals(key)) { + if(value.getValue() != null) { + valueStr = clusterMapper.writeCluster((Cluster) value.getValue()); + } } else { throw new VoldemortException("Unhandled key:'" + key + "' for Object to String serialization."); @@ -641,6 +652,10 @@ private Versioned convertStringToObject(String key, Versioned va } else { valueObject = new RebalancerState(Arrays.asList(RebalancePartitionsInfo.create(valueString))); } + } else if(REBALANCING_SOURCE_CLUSTER_XML.equals(key)) { + if(value.getValue() != null && value.getValue().length() > 0) { + valueObject = clusterMapper.readCluster(new StringReader(value.getValue())); + } } else { throw new VoldemortException("Unhandled key:'" + key + "' for String to Object serialization."); diff --git a/src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java b/src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java new file mode 100644 index 0000000000..eea8056da1 --- /dev/null +++ b/src/java/voldemort/store/rebalancing/AsyncProxyPutTask.java @@ -0,0 +1,115 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.rebalancing; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import voldemort.cluster.Node; +import voldemort.store.Store; +import voldemort.store.UnreachableStoreException; +import voldemort.store.metadata.MetadataStore; +import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; +import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.Versioned; + +/** + * Task that issues the proxy put against the old replica, based on the old + * cluster metadata. This is best effort async replication. Failures will be + * logged and the server log will be post processed in case the rebalancing + * fails and we move back to old topology + * + * NOTE : There is no need for any special ordering of the proxy puts in the + * async thread pool (although the threadpool will have a queue of pending proxy + * puts internally), since a later version being proxy put before an earlier + * version would simply result in an OVE for the earlier proxy put. Online + * traffic will not be affected since the proxy node is not a replica and hence + * no client will be reading from it (if we are at all wondering about read + * consistency) + * + */ +public class AsyncProxyPutTask implements Runnable { + + private final static Logger logger = Logger.getLogger(AsyncProxyPutTask.class); + + private final RedirectingStore redirectingStore; + private final ByteArray key; + private final Versioned value; + private final byte[] transforms; + private final int destinationNode; + + AsyncProxyPutTask(RedirectingStore redirectingStore, + ByteArray key, + Versioned value, + byte[] transforms, + int destinationNode) { + this.key = key; + this.value = value; + this.transforms = transforms; + this.redirectingStore = redirectingStore; + this.destinationNode = destinationNode; + logger.setLevel(Level.TRACE); + } + + @Override + public void run() { + + MetadataStore metadata = redirectingStore.getMetadataStore(); + Node donorNode = metadata.getCluster().getNodeById(destinationNode); + long startNs = System.nanoTime(); + try { + // TODO there are no retries now if the node we want to write to is + // unavailable + redirectingStore.checkNodeAvailable(donorNode); + Store socketStore = redirectingStore.getRedirectingSocketStore(redirectingStore.getName(), + destinationNode); + + socketStore.put(key, value, transforms); + redirectingStore.recordSuccess(donorNode, startNs); + if(logger.isTraceEnabled()) { + logger.trace("Proxy write for store " + redirectingStore.getName() + " key " + + ByteUtils.toBinaryString(key.get()) + " to destinationNode:" + + destinationNode); + } + } catch(UnreachableStoreException e) { + redirectingStore.recordException(donorNode, startNs, e); + logger.error("Failed to reach proxy node " + donorNode, e); + redirectingStore.reporteProxyPutFailure(); + } catch(ObsoleteVersionException ove) { + // Proxy puts can get an OVE if somehow there are two stealers for + // the same donor and the other stealer's proxy put already got to + // the donor.. This will not result from online put winning, since + // we don't issue proxy puts if the donor is still a replica + if(logger.isTraceEnabled()) { + logger.trace("OVE in proxy put for destinationNode: " + destinationNode + + " from node:" + metadata.getNodeId() + " on key " + + ByteUtils.toHexString(key.get()) + " Version:" + + value.getVersion(), + ove); + } + redirectingStore.reporteProxyPutFailure(); + } catch(Exception e) { + // Just log the key.. Not sure having values in the log is a good + // idea. + logger.error("Unexpected exception in proxy put for destinationNode: " + + destinationNode + " from node:" + metadata.getNodeId() + " on key " + + ByteUtils.toHexString(key.get()) + " Version:" + value.getVersion(), e); + redirectingStore.reporteProxyPutFailure(); + } + } +} \ No newline at end of file diff --git a/src/java/voldemort/store/rebalancing/ProxyPutStats.java b/src/java/voldemort/store/rebalancing/ProxyPutStats.java new file mode 100644 index 0000000000..280e0fff19 --- /dev/null +++ b/src/java/voldemort/store/rebalancing/ProxyPutStats.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.rebalancing; + +import java.util.concurrent.atomic.AtomicLong; + +import voldemort.annotations.jmx.JmxGetter; + +/** + * Statistics on Proxy puts issued from the redirecting store + * + */ +public class ProxyPutStats { + + private AtomicLong numProxyPutFailures; + + private AtomicLong numPendingProxyPuts; + + private ProxyPutStats parent; + + public ProxyPutStats(ProxyPutStats parent) { + this.numPendingProxyPuts = new AtomicLong(); + this.numProxyPutFailures = new AtomicLong(); + this.parent = parent; + } + + public void reportProxyPutSubmission() { + this.numPendingProxyPuts.incrementAndGet(); + if(this.parent != null) { + this.parent.reportProxyPutSubmission(); + } + } + + public void reportProxyPutCompletion() { + this.numPendingProxyPuts.decrementAndGet(); + if(this.parent != null) { + this.parent.reportProxyPutCompletion(); + } + } + + public void reportProxyPutFailure() { + this.numProxyPutFailures.incrementAndGet(); + if(this.parent != null) { + this.parent.reportProxyPutFailure(); + } + } + + @JmxGetter(name = "numProxyPutFailures") + public long getNumProxyPutFailures() { + return numProxyPutFailures.get(); + } + + @JmxGetter(name = "numPendingProxyPuts") + public long getNumPendingProxyPuts() { + return numPendingProxyPuts.get(); + } +} \ No newline at end of file diff --git a/src/java/voldemort/store/rebalancing/RedirectingStore.java b/src/java/voldemort/store/rebalancing/RedirectingStore.java index f51bae6295..9260bb3705 100644 --- a/src/java/voldemort/store/rebalancing/RedirectingStore.java +++ b/src/java/voldemort/store/rebalancing/RedirectingStore.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -26,19 +27,20 @@ import voldemort.annotations.jmx.JmxGetter; import voldemort.annotations.jmx.JmxSetter; import voldemort.client.protocol.RequestFormatType; -import voldemort.client.rebalance.RebalancePartitionsInfo; +import voldemort.cluster.Cluster; import voldemort.cluster.Node; import voldemort.cluster.failuredetector.FailureDetector; -import voldemort.routing.RoutingStrategy; -import voldemort.routing.RoutingStrategyFactory; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.RequestRoutingType; import voldemort.server.StoreRepository; import voldemort.store.DelegatingStore; import voldemort.store.Store; +import voldemort.store.StoreDefinition; import voldemort.store.StoreUtils; import voldemort.store.UnreachableStoreException; import voldemort.store.metadata.MetadataStore; import voldemort.store.metadata.MetadataStore.VoldemortState; +import voldemort.store.readonly.ReadOnlyStorageConfiguration; import voldemort.store.socket.SocketStoreFactory; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; @@ -69,13 +71,19 @@ public class RedirectingStore extends DelegatingStore private FailureDetector failureDetector; private AtomicBoolean isRedirectingStoreEnabled; private boolean isProxyPutEnabled; + private final ExecutorService proxyPutWorkerPool; + + // statistics on proxy put tasks + private ProxyPutStats proxyPutStats; public RedirectingStore(Store innerStore, MetadataStore metadata, StoreRepository storeRepository, FailureDetector detector, SocketStoreFactory storeFactory, - boolean isProxyPutEnabled) { + boolean isProxyPutEnabled, + ExecutorService proxyPutWorkerPool, + ProxyPutStats proxyPutStats) { super(innerStore); this.metadata = metadata; this.storeRepository = storeRepository; @@ -83,6 +91,8 @@ public RedirectingStore(Store innerStore, this.failureDetector = detector; this.isRedirectingStoreEnabled = new AtomicBoolean(true); this.isProxyPutEnabled = isProxyPutEnabled; + this.proxyPutWorkerPool = proxyPutWorkerPool; + this.proxyPutStats = proxyPutStats; } @JmxSetter(name = "setRedirectingStoreEnabled", description = "Enable the redirecting store for this store") @@ -97,58 +107,44 @@ public boolean getIsRedirectingStoreEnabled() { return this.isRedirectingStoreEnabled.get(); } - private RebalancePartitionsInfo redirectingKey(ByteArray key) { - if(VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState()) - && isRedirectingStoreEnabled.get()) { - return metadata.getRebalancerState().find(getName(), - metadata.getRoutingStrategy(getName()) - .getPartitionList(key.get()), - metadata.getCluster() - .getNodeById(metadata.getNodeId()) - .getPartitionIds()); - } - return null; - } - @Override public List> get(ByteArray key, byte[] transforms) throws VoldemortException { - RebalancePartitionsInfo stealInfo = redirectingKey(key); + Integer redirectNode = getProxyNode(key.get()); /** * If I am rebalancing for this key, try to do remote get(), put it * locally first to get the correct version ignoring any * {@link ObsoleteVersionException} */ - // TODO These is some unneccessary performance hit here. keys already - // moved over will always result in OVE and time spent waiting on this - // (esp for cross zone moves) would be a total waste. Need to rework to - // this logic to incur this when necessary - if(stealInfo != null) { + // FIXME AR There is some unneccessary performance hit here. keys + // already moved over will always result in OVE and time spent waiting + // on this (esp for cross zone moves) would be a total waste. Need to + // rework to this logic to incur this when necessary + if(redirectNode != null) { if(logger.isTraceEnabled()) { logger.trace("Proxying GET on stealer:" + metadata.getNodeId() + " for key " - + ByteUtils.toHexString(key.get()) + " to donor:" - + stealInfo.getDonorId()); + + ByteUtils.toHexString(key.get()) + " to node:" + redirectNode); } - proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms); + proxyGetAndLocalPut(key, redirectNode, transforms); } return getInnerStore().get(key, transforms); } @Override public List getVersions(ByteArray key) { - RebalancePartitionsInfo stealInfo = redirectingKey(key); + Integer redirectNode = getProxyNode(key.get()); /** * If I am rebalancing for this key, try to do remote get(), put it * locally first to get the correct version ignoring any * {@link ObsoleteVersionException}. */ // TODO same fixes apply here as in get(..) above - if(stealInfo != null) { + if(redirectNode != null) { if(logger.isTraceEnabled()) { logger.trace("Proxying GETVERSIONS on stealer:" + metadata.getNodeId() - + " for key " + ByteUtils.toHexString(key.get()) + " to donor:" - + stealInfo.getDonorId()); + + " for key " + ByteUtils.toHexString(key.get()) + " to node:" + + redirectNode); } - proxyGetAndLocalPut(key, stealInfo.getDonorId(), null); + proxyGetAndLocalPut(key, redirectNode, null); } return getInnerStore().getVersions(key); } @@ -157,16 +153,16 @@ public List getVersions(ByteArray key) { public Map>> getAll(Iterable keys, Map transforms) throws VoldemortException { - Map rebalancePartitionsInfoPerKey = Maps.newHashMapWithExpectedSize(Iterables.size(keys)); + Map keyToProxyNodeMap = Maps.newHashMapWithExpectedSize(Iterables.size(keys)); for(ByteArray key: keys) { - RebalancePartitionsInfo info = redirectingKey(key); - if(info != null) { - rebalancePartitionsInfoPerKey.put(key, info); + Integer redirectNode = getProxyNode(key.get()); + if(redirectNode != null) { + keyToProxyNodeMap.put(key, redirectNode); } } - // TODO Same optimizations. Go to the proxy only for keys that this node - // does not have.. - if(!rebalancePartitionsInfoPerKey.isEmpty()) { + // FIXME AR Same optimizations. Go to the proxy only for keys that this + // node does not have + if(!keyToProxyNodeMap.isEmpty()) { if(logger.isTraceEnabled()) { String keyStr = ""; for(ByteArray key: keys) @@ -174,7 +170,7 @@ public Map>> getAll(Iterable keys, logger.trace("Proxying GETALL on stealer:" + metadata.getNodeId() + " for keys " + keyStr); } - proxyGetAllAndLocalPut(rebalancePartitionsInfoPerKey, transforms); + proxyGetAllAndLocalPut(keyToProxyNodeMap, transforms); } return getInnerStore().getAll(keys, transforms); @@ -183,31 +179,54 @@ public Map>> getAll(Iterable keys, @Override public void put(ByteArray key, Versioned value, byte[] transforms) throws VoldemortException { - RebalancePartitionsInfo stealInfo = redirectingKey(key); + + Cluster currentCluster = metadata.getCluster(); + // FIXME AR O(n) linear lookup of storedef + StoreDefinition storeDef = metadata.getStoreDef(getName()); + + // defensively, error out if this is a read-only store and someone is + // doing puts against it. We don't to do extra work and fill the log + // with errors in that case. + if(storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0) { + throw new UnsupportedOperationException("put() not supported on read-only store"); + } + StoreRoutingPlan currentRoutingPlan = new StoreRoutingPlan(currentCluster, storeDef); + Integer redirectNode = getProxyNode(currentRoutingPlan, storeDef, key.get()); + /** * If I am rebalancing for this key, try to do remote get() , put it * locally first to get the correct version ignoring any * {@link ObsoleteVersionException} */ - // TODO same optimizations apply here.. If the key already exists skip - // this - if(stealInfo != null) { + // FIXME AR same optimizations apply here.. If the key already exists + // skip this + if(redirectNode != null) { if(logger.isTraceEnabled()) { logger.trace("Proxying GET (before PUT) on stealer:" + metadata.getNodeId() - + " for key " + ByteUtils.toHexString(key.get()) + " to donor:" - + stealInfo.getDonorId()); + + " for key " + ByteUtils.toHexString(key.get()) + " to node:" + + redirectNode); } - proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms); + proxyGetAndLocalPut(key, redirectNode, transforms); } // put the data locally, if this step fails, there will be no proxy puts getInnerStore().put(key, value, transforms); - // TODO make this best effort async replication. Failures will be - // logged and the server log will be post processed in case the - // rebalancing fails and we move back to old topology - if(isProxyPutEnabled && stealInfo != null) { - proxyPut(key, value, transforms, stealInfo.getDonorId()); + // submit an async task to issue proxy puts to the redirectNode + // NOTE : if the redirect node is also a current replica for the key (we + // could have a situation where the online replicated write could lose + // out to the proxy put and hence fail the client operation with an + // OVE). So do not send proxy puts in those cases. + if(isProxyPutEnabled && redirectNode != null + && !currentRoutingPlan.getReplicationNodeList(key.get()).contains(redirectNode)) { + AsyncProxyPutTask asyncProxyPutTask = new AsyncProxyPutTask(this, + key, + value, + transforms, + redirectNode); + proxyPutStats.reportProxyPutSubmission(); + proxyPutWorkerPool.submit(asyncProxyPutTask); + asyncProxyPutTask.run(); } } @@ -234,6 +253,95 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException return getInnerStore().delete(key, version); } + /** + * Checks if the server has to do any proxying of gets/puts to another + * server, as a part of an ongoing rebalance operation. + * + * Basic idea : Any given node which is a stealer of a partition, as the ith + * replica of a given zone, will proxy to the old ith replica of the + * partition in the given zone, as per the source cluster metadata. + * Exception : if this amounts to proxying to itself. + * + * Note on Zone Expansion : For zone expansion, there will be no proxying + * within the new zone. This is a practical assumption since if we fail, we + * fallback to a cluster topology without the new zone. As a result, reads + * from the new zone are not guaranteed to return some values during the + * course of zone expansion. This is a also reasonable since any + * organization undertaking such effort would need to have the data in place + * in the new zone, before the client apps are moved over. + * + * + * @param currentRoutingPlan routing plan object based on cluster's current + * topology + * @param storeDef definition of the store being redirected + * @param key to decide where to proxy to + * @return Null if no proxying is required else node id of the server to + * proxy to + */ + private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan, + StoreDefinition storeDef, + byte[] key) { + // get out if not rebalancing or if redirecting is disabled. + if(!VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState()) + || !isRedirectingStoreEnabled.get()) { + return null; + } + + // TODO a better design would be to get these state changes from + // metadata listener callbacks, so we need not allocate these objects + // all the time + Cluster sourceCluster = metadata.getRebalancingSourceCluster(); + if(sourceCluster == null) { + /* + * This is more for defensive coding purposes. The update of the + * source cluster key happens before the server is put in + * REBALANCING mode and is reset to null after the server goes back + * to NORMAL mode. + */ + if(logger.isTraceEnabled()) { + logger.trace("Old Cluster is null.. bail"); + } + return null; + } + + Integer nodeId = metadata.getNodeId(); + Integer zoneId = currentRoutingPlan.getCluster().getNodeById(nodeId).getZoneId(); + + StoreRoutingPlan oldRoutingPlan = new StoreRoutingPlan(sourceCluster, storeDef); + // Check the current node's relationship to the key. + int zoneReplicaType = currentRoutingPlan.getZoneReplicaType(zoneId, nodeId, key); + // Determine which node held the key with the same relationship in the + // old cluster. That is your man! + Integer redirectNodeId; + try { + redirectNodeId = oldRoutingPlan.getZoneReplicaNode(zoneId, zoneReplicaType, key); + } catch(VoldemortException ve) { + // If the zone does not exist, as in the case of Zone Expansion, + // there will be no proxy bridges built + return null; + } + // Unless he is the same as this node (where this is meaningless effort) + if(redirectNodeId == nodeId) { + return null; + } + return redirectNodeId; + } + + /** + * Wrapper around + * {@link RedirectingStore#getProxyNode(StoreRoutingPlan, StoreDefinition, byte[])} + * + * @param key + * @return + */ + private Integer getProxyNode(byte[] key) { + Cluster currentCluster = metadata.getCluster(); + // FIXME AR O(n) linear lookup of storedef + StoreDefinition storeDef = metadata.getStoreDef(getName()); + StoreRoutingPlan currentRoutingPlan = new StoreRoutingPlan(currentCluster, storeDef); + return getProxyNode(currentRoutingPlan, storeDef, key); + } + /** * Performs a back-door proxy get to * {@link voldemort.client.rebalance.RebalancePartitionsInfo#getDonorId() @@ -259,77 +367,7 @@ private List> proxyGet(ByteArray key, int donorNodeId, byte[] } } - /** - * Replay the put to the donor proxy node so we will have the data available - * at the proxy host, in case the rebalancing fails. - * - * NOTE: This logic depends on the assumption that all the replicas for this - * partition in the old topology are now either donors during rebalancing or - * still active replicas. Otherwise, some old replica might not have any - * incoming proxy puts. As a result, if the rebalancing fails, the updates - * during the rebalancing window would not have made it to all the old - * replicas. - * - * @param key - * @param value - * @param transforms - * @param donorNodeId - * @throws ProxyUnreachableException if donor node can't be reached - */ - private void proxyPut(ByteArray key, Versioned value, byte[] transforms, int donorNodeId) { - Node donorNode = metadata.getCluster().getNodeById(donorNodeId); - - // Check if the donor is still a replica for the key. If we send proxy - // puts there, then we could have a situation where the online - // replicated write could lose out to the proxy put and hence fail the - // client operation with an OVE - // TODO not sure constructing this object everytime is a good idea. But - // the current design lack the ability to be able to clearly detect when - // the server goes back to NORMAL mode - RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(metadata.getStoreDef(getName()), - metadata.getCluster()); - if(routingStrategy.routeRequest(key.get()).contains(donorNode)) { - if(logger.isTraceEnabled()) { - logger.trace("Donor " + donorNode.getId() - + " still a replica in the updated cluster for the key " - + ByteUtils.toHexString(key.get()) + ". Skipping proxy put"); - } - return; - } - - checkNodeAvailable(donorNode); - - long startNs = System.nanoTime(); - try { - Store redirectingStore = getRedirectingSocketStore(getName(), - donorNodeId); - redirectingStore.put(key, value, transforms); - recordSuccess(donorNode, startNs); - } catch(UnreachableStoreException e) { - recordException(donorNode, startNs, e); - logger.error("Failed to reach proxy node " + donorNode, e); - } catch(ObsoleteVersionException ove) { - // Proxy puts can get an OVE if somehow there are two stealers for - // the same donor and the other stealer's proxy put already got to - // the donor.. This will not result from online put winning, since - // we don't issue proxy puts if the donor is still a replica - if(logger.isTraceEnabled()) { - logger.trace("OVE in proxy put for donor: " + donorNodeId + " from stealer:" - + metadata.getNodeId() + " key " - + ByteUtils.toHexString(key.get()), - ove); - } - } catch(Exception e) { - // Just log the key.. Not sure having values in the log is a good - // idea. - logger.error("Unexpected exception in proxy put for key:" - + ByteUtils.toHexString(key.get()) + " against donor node " - + donorNodeId, - e); - } - } - - private void checkNodeAvailable(Node donorNode) { + protected void checkNodeAvailable(Node donorNode) { if(!failureDetector.isAvailable(donorNode)) throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode + " is marked down by failure detector."); @@ -340,21 +378,21 @@ private void checkNodeAvailable(Node donorNode) { * {@link voldemort.client.rebalance.RebalancePartitionsInfo#getDonorId() * getDonorId} * - * @param rebalancePartitionsInfoPerKey Map of keys to corresponding - * partition info + * @param keyToProxyNodeMap Map of keys to corresponding proxy nodes housing + * the keys in source cluster * @param transforms Map of keys to their corresponding transforms * @throws ProxyUnreachableException if donor node can't be reached */ - private Map>> proxyGetAll(Map rebalancePartitionsInfoPerKey, + private Map>> proxyGetAll(Map keyToProxyNodeMap, Map transforms) throws VoldemortException { Multimap donorNodeToKeys = HashMultimap.create(); int numKeys = 0; // Transform the map of key to plan to a map of donor node id to keys - for(Map.Entry entry: rebalancePartitionsInfoPerKey.entrySet()) { + for(Map.Entry entry: keyToProxyNodeMap.entrySet()) { numKeys++; - donorNodeToKeys.put(entry.getValue().getDonorId(), entry.getKey()); + donorNodeToKeys.put(entry.getValue(), entry.getKey()); } Map>> gatherMap = Maps.newHashMapWithExpectedSize(numKeys); @@ -422,16 +460,16 @@ private List> proxyGetAndLocalPut(ByteArray key, * Similar to {@link #proxyGetAndLocalPut(ByteArray, int)} but meant for * {@link #getAll(Iterable)} * - * @param rebalancePartitionsInfoPerKey Map of keys which are being routed - * to their corresponding plan + * @param keyToProxyNodeMap Map of keys which are being routed to their + * corresponding proxy nodes * @param transforms Map of key to their corresponding transforms * @return Returns a map of key to its corresponding list of values * @throws VoldemortException if {@link #proxyGetAll(List, List)} fails */ - private Map>> proxyGetAllAndLocalPut(Map rebalancePartitionsInfoPerKey, + private Map>> proxyGetAllAndLocalPut(Map keyToProxyNodeMap, Map transforms) throws VoldemortException { - Map>> proxyKeyValues = proxyGetAll(rebalancePartitionsInfoPerKey, + Map>> proxyKeyValues = proxyGetAll(keyToProxyNodeMap, transforms); for(Map.Entry>> keyValuePair: proxyKeyValues.entrySet()) { for(Versioned proxyValue: keyValuePair.getValue()) { @@ -454,8 +492,8 @@ private Map>> proxyGetAllAndLocalPut(MapSocketStore object for storeName and * donorNodeId */ - private Store getRedirectingSocketStore(String storeName, - int donorNodeId) { + protected Store getRedirectingSocketStore(String storeName, + int donorNodeId) { if(!storeRepository.hasRedirectingSocketStore(storeName, donorNodeId)) { synchronized(storeRepository) { if(!storeRepository.hasRedirectingSocketStore(storeName, donorNodeId)) { @@ -485,11 +523,24 @@ private Node getNodeIfPresent(int donorId) { } } - private void recordException(Node node, long startNs, UnreachableStoreException e) { + protected void recordException(Node node, long startNs, UnreachableStoreException e) { failureDetector.recordException(node, (System.nanoTime() - startNs) / Time.NS_PER_MS, e); } - private void recordSuccess(Node node, long startNs) { + protected void recordSuccess(Node node, long startNs) { + proxyPutStats.reportProxyPutCompletion(); failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / Time.NS_PER_MS); } + + protected MetadataStore getMetadataStore() { + return metadata; + } + + protected void reporteProxyPutFailure() { + proxyPutStats.reportProxyPutFailure(); + } + + public ProxyPutStats getProxyPutStats() { + return this.proxyPutStats; + } } diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index 3665173147..b783d3c9df 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -901,6 +901,7 @@ public void run() { } } + @Test(timeout = 600000) public void testProxyPutDuringRebalancing() throws Exception { logger.info("Starting testProxyPutDuringRebalancing"); try { @@ -958,6 +959,7 @@ public void testProxyPutDuringRebalancing() throws Exception { baselineVersions.put(key, new VectorClock()); } + final CountDownLatch latch = new CountDownLatch(2); // start get operation. executors.execute(new Runnable() { @@ -1033,6 +1035,7 @@ public void run() { } finally { if(factory != null) factory.close(); + latch.countDown(); } } @@ -1049,10 +1052,12 @@ public void run() { exceptions.add(e); } finally { rebalancingComplete.set(true); + latch.countDown(); } } }); + latch.await(); executors.shutdown(); executors.awaitTermination(300, TimeUnit.SECONDS); diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index eb7664912c..aead8763bf 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -582,6 +582,7 @@ public void run() { } } + @Test(timeout = 600000) public void testProxyPutDuringRebalancing() throws Exception { logger.info("Starting testProxyPutDuringRebalancing"); try { diff --git a/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java b/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java index 3e20bb96b7..f9d6b072c1 100644 --- a/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AdminRebalanceTest.java @@ -399,6 +399,9 @@ public void testRebalanceNodeRW() throws IOException { getServer(partitionPlan.getStealerId()).getMetadataStore() .put(MetadataStore.SERVER_STATE_KEY, MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER); + getServer(partitionPlan.getStealerId()).getMetadataStore() + .put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + partitionPlan.getInitialCluster()); } try { @@ -571,6 +574,9 @@ public void testRebalanceNodeRW2() throws IOException { getServer(partitionPlan.getStealerId()).getMetadataStore() .put(MetadataStore.REBALANCING_STEAL_INFO, new RebalancerState(Lists.newArrayList(RebalancePartitionsInfo.create(partitionPlan.toJsonString())))); + getServer(partitionPlan.getStealerId()).getMetadataStore() + .put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + partitionPlan.getInitialCluster()); } // Update the cluster metadata on all three nodes @@ -745,6 +751,9 @@ public void testRebalanceNodeRO() throws IOException { getServer(partitionPlan.getStealerId()).getMetadataStore() .put(MetadataStore.REBALANCING_STEAL_INFO, new RebalancerState(Lists.newArrayList(RebalancePartitionsInfo.create(partitionPlan.toJsonString())))); + getServer(partitionPlan.getStealerId()).getMetadataStore() + .put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + partitionPlan.getInitialCluster()); } // Actually run it @@ -899,6 +908,9 @@ public void testRebalanceNodeRORW() throws IOException, InterruptedException { getServer(partitionPlan.getStealerId()).getMetadataStore() .put(MetadataStore.REBALANCING_STEAL_INFO, new RebalancerState(Lists.newArrayList(RebalancePartitionsInfo.create(partitionPlan.toJsonString())))); + getServer(partitionPlan.getStealerId()).getMetadataStore() + .put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + partitionPlan.getInitialCluster()); } // Actually run it diff --git a/test/unit/voldemort/store/metadata/MetadataStoreTest.java b/test/unit/voldemort/store/metadata/MetadataStoreTest.java index 83702b7601..4ed6050845 100644 --- a/test/unit/voldemort/store/metadata/MetadataStoreTest.java +++ b/test/unit/voldemort/store/metadata/MetadataStoreTest.java @@ -16,14 +16,21 @@ package voldemort.store.metadata; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import junit.framework.TestCase; +import org.junit.Before; +import org.junit.Test; + import voldemort.ServerTestUtils; import voldemort.client.rebalance.RebalancePartitionsInfo; +import voldemort.cluster.Cluster; import voldemort.server.rebalance.RebalancerState; import voldemort.store.metadata.MetadataStore.VoldemortState; import voldemort.utils.ByteArray; @@ -36,7 +43,7 @@ import com.google.common.collect.Maps; -public class MetadataStoreTest extends TestCase { +public class MetadataStoreTest { private static int TEST_RUNS = 100; @@ -44,11 +51,11 @@ public class MetadataStoreTest extends TestCase { private List TEST_KEYS = Arrays.asList(MetadataStore.CLUSTER_KEY, MetadataStore.STORES_KEY, MetadataStore.REBALANCING_STEAL_INFO, - MetadataStore.SERVER_STATE_KEY); + MetadataStore.SERVER_STATE_KEY, + MetadataStore.REBALANCING_SOURCE_CLUSTER_XML); - @Override + @Before public void setUp() throws Exception { - super.setUp(); metadataStore = ServerTestUtils.createMetadataStore(ServerTestUtils.getLocalCluster(1), ServerTestUtils.getStoreDefs(1)); } @@ -61,7 +68,8 @@ public ByteArray getValidKey() { public byte[] getValidValue(ByteArray key) { String keyString = ByteUtils.getString(key.get(), "UTF-8"); - if(MetadataStore.CLUSTER_KEY.equals(keyString)) { + if(MetadataStore.CLUSTER_KEY.equals(keyString) + || MetadataStore.REBALANCING_SOURCE_CLUSTER_XML.equals(keyString)) { return ByteUtils.getBytes(new ClusterMapper().writeCluster(ServerTestUtils.getLocalCluster(1)), "UTF-8"); } else if(MetadataStore.STORES_KEY.equals(keyString)) { @@ -96,6 +104,7 @@ public byte[] getValidValue(ByteArray key) { throw new RuntimeException("Unhandled key:" + keyString + " passed"); } + @Test public void testSimpleGetAndPut() { for(int i = 0; i <= TEST_RUNS; i++) { ByteArray key = getValidKey(); @@ -108,6 +117,7 @@ public void testSimpleGetAndPut() { } } + @Test public void testRepeatedPuts() { for(int i = 0; i <= TEST_RUNS; i++) { for(int j = 0; j <= 5; j++) { @@ -123,6 +133,7 @@ public void testRepeatedPuts() { } } + @Test public void testObsoletePut() { for(int i = 0; i <= TEST_RUNS; i++) { ByteArray key = getValidKey(); @@ -141,6 +152,7 @@ public void testObsoletePut() { } } + @Test public void testSynchronousPut() { for(int i = 0; i <= TEST_RUNS; i++) { ByteArray key = getValidKey(); @@ -160,6 +172,7 @@ public void testSynchronousPut() { } } + @Test public void testCleanAllStates() { // put state entries. incrementVersionAndPut(metadataStore, @@ -179,6 +192,28 @@ public void testCleanAllStates() { VoldemortState.NORMAL_SERVER); } + @Test + public void testRebalacingSourceClusterXmlKey() { + metadataStore.cleanAllRebalancingState(); + + assertTrue("Should be null", null == metadataStore.getRebalancingSourceCluster()); + + Cluster dummyCluster = ServerTestUtils.getLocalCluster(2); + metadataStore.put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, dummyCluster); + assertEquals("Should be equal", dummyCluster, metadataStore.getRebalancingSourceCluster()); + + metadataStore.put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, (Object) null); + assertTrue("Should be null", null == metadataStore.getRebalancingSourceCluster()); + + List> sourceClusterVersions = metadataStore.get(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + null); + assertTrue("Just one version expected", 1 == sourceClusterVersions.size()); + assertEquals("Empty string should map to null", + "", + new String(sourceClusterVersions.get(0).getValue())); + + } + private void checkValues(Versioned value, List> list, ByteArray key) { assertEquals("should return exactly one value ", 1, list.size()); diff --git a/test/unit/voldemort/store/rebalancing/RedirectingStoreTest.java b/test/unit/voldemort/store/rebalancing/RedirectingStoreTest.java index 821d748b9d..7db7fb0a06 100644 --- a/test/unit/voldemort/store/rebalancing/RedirectingStoreTest.java +++ b/test/unit/voldemort/store/rebalancing/RedirectingStoreTest.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -26,6 +27,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; import junit.framework.TestCase; @@ -56,6 +58,7 @@ import voldemort.server.VoldemortServer; import voldemort.server.rebalance.RebalancerState; import voldemort.store.InvalidMetadataException; +import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinitionBuilder; import voldemort.store.memory.InMemoryStorageConfiguration; @@ -63,6 +66,7 @@ import voldemort.store.socket.SocketStoreFactory; import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; import voldemort.utils.ByteArray; +import voldemort.utils.DaemonThreadFactory; import voldemort.utils.RebalanceUtils; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; @@ -82,6 +86,8 @@ public class RedirectingStoreTest extends TestCase { private List secondaryPartitionsMoved; private HashMap primaryEntriesMoved; private HashMap secondaryEntriesMoved; + private HashMap proxyPutTestPrimaryEntries; + private HashMap proxyPutTestSecondaryEntries; private final boolean useNio; private StoreDefinition storeDef; private final SocketStoreFactory storeFactory = new ClientRequestExecutorPool(2, @@ -142,6 +148,8 @@ public void setUp() throws IOException, InterruptedException { this.primaryEntriesMoved = Maps.newHashMap(); this.secondaryEntriesMoved = Maps.newHashMap(); + this.proxyPutTestPrimaryEntries = Maps.newHashMap(); + this.proxyPutTestSecondaryEntries = Maps.newHashMap(); RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, currentCluster); @@ -159,6 +167,32 @@ public void setUp() throws IOException, InterruptedException { // Hope the 'God of perfect timing' is on our side Thread.sleep(500); + // steal a few primary key-value pairs for testing proxy put logic + int cnt = 0; + for(Entry entry: primaryEntriesMoved.entrySet()) { + if(cnt > 3) + break; + this.proxyPutTestPrimaryEntries.put(entry.getKey(), entry.getValue()); + cnt++; + } + for(ByteArray key: this.proxyPutTestPrimaryEntries.keySet()) { + this.primaryEntriesMoved.remove(key); + } + assertTrue("Not enough primary entries", primaryEntriesMoved.size() > 1); + + // steal a few secondary key-value pairs for testing proxy put logic + cnt = 0; + for(Entry entry: secondaryEntriesMoved.entrySet()) { + if(cnt > 3) + break; + this.proxyPutTestSecondaryEntries.put(entry.getKey(), entry.getValue()); + cnt++; + } + for(ByteArray key: this.proxyPutTestSecondaryEntries.keySet()) { + this.secondaryEntriesMoved.remove(key); + } + assertTrue("Not enough secondary entries", primaryEntriesMoved.size() > 1); + RebalanceClusterPlan plan = new RebalanceClusterPlan(currentCluster, targetCluster, Lists.newArrayList(storeDef), @@ -173,6 +207,9 @@ public void setUp() throws IOException, InterruptedException { servers[partitionPlan.getStealerId()].getMetadataStore() .put(MetadataStore.REBALANCING_STEAL_INFO, new RebalancerState(Lists.newArrayList(partitionPlan))); + servers[partitionPlan.getStealerId()].getMetadataStore() + .put(MetadataStore.REBALANCING_SOURCE_CLUSTER_XML, + currentCluster); } // Update the cluster metadata on all three nodes @@ -226,7 +263,10 @@ private RedirectingStore getRedirectingStore(int nodeId, servers[nodeId].getStoreRepository(), new NoopFailureDetector(), storeFactory, - false); + true, + Executors.newFixedThreadPool(1, + new DaemonThreadFactory("voldemort-proxy-put-thread")), + new ProxyPutStats(null)); } @Test @@ -317,7 +357,7 @@ public void testProxyGetAll() { } @Test - public void testProxyPut() { + public void testProxyGetDuringPut() { final RedirectingStore storeNode2 = getRedirectingStore(2, servers[2].getMetadataStore(), @@ -355,7 +395,124 @@ public void testProxyPut() { } } + } + /** + * This exits out immediately if the node is not proxy putting. + * + * @param store + */ + private void waitForProxyPutsToDrain(RedirectingStore store) { + // wait for the proxy write to complete + while(store.getProxyPutStats().getNumPendingProxyPuts() > 0) { + try { + Thread.sleep(50); + } catch(InterruptedException e) { + e.printStackTrace(); + } + } } + @Test + public void testProxyPuts() { + + List testPrimaryKeys = new ArrayList(this.proxyPutTestPrimaryEntries.keySet()); + List testSecondaryKeys = new ArrayList(this.proxyPutTestSecondaryEntries.keySet()); + + final RedirectingStore redirectingStoreNode2 = getRedirectingStore(2, + servers[2].getMetadataStore(), + "test"); + final RedirectingStore redirectingStoreNode0 = getRedirectingStore(0, + servers[0].getMetadataStore(), + "test"); + final Store socketStoreNode2 = redirectingStoreNode2.getRedirectingSocketStore("test", + 2); + final Store socketStoreNode0 = redirectingStoreNode0.getRedirectingSocketStore("test", + 0); + + // 1. Make sure the vector clocks make sense.. Read through Node 2 and + // proxy getting from Node 0 and issue a write based off that, + // incrementing the clock for Node 2 and make sure there is no + // ObsoleteVersionException at both Node 0 and + // Node 2. + ByteArray key1 = testSecondaryKeys.get(0); + VectorClock clock1 = ((VectorClock) redirectingStoreNode2.getVersions(key1).get(0)).incremented(2, + System.currentTimeMillis()); + try { + redirectingStoreNode2.put(key1, + Versioned.value("write-through".getBytes("UTF-8"), clock1), + null); + } catch(Exception e) { + fail("Unexpected error in testing write through proxy put"); + e.printStackTrace(); + } + waitForProxyPutsToDrain(redirectingStoreNode2); + + assertTrue("Unexpected failures in proxy put", + redirectingStoreNode2.getProxyPutStats().getNumProxyPutFailures() == 0); + assertEquals("Unexpected value in Node 2", + "write-through", + new String(socketStoreNode2.get(key1, null).get(0).getValue())); + assertTrue("Proxy write not seen on proxy node 0", + "write-through".equals(new String(socketStoreNode0.get(key1, null) + .get(0) + .getValue()))); + + // Also test that if put fails locally, proxy put is not attempted. + try { + redirectingStoreNode2.put(key1, + Versioned.value("write-through-updated".getBytes("UTF-8"), + clock1), + null); + fail("Should have thrown OVE"); + } catch(ObsoleteVersionException ove) { + // Expected + } catch(Exception e) { + fail("Unexpected error in testing write through proxy put"); + e.printStackTrace(); + } + waitForProxyPutsToDrain(redirectingStoreNode2); + assertFalse("Proxy write not seen on proxy node 0", + "write-through-updated".equals(new String(socketStoreNode0.get(key1, null) + .get(0) + .getValue()))); + + // 2. Make sure if the proxy node is still a replica, we don't issue + // proxy puts. Node 2 -> Node 0 on partition 0, for which Node 0 is + // still a replica + ByteArray key2 = testPrimaryKeys.get(0); + VectorClock clock2 = ((VectorClock) redirectingStoreNode2.getVersions(key2).get(0)).incremented(2, + System.currentTimeMillis()); + try { + redirectingStoreNode2.put(key2, + Versioned.value("write-through".getBytes("UTF-8"), clock2), + null); + } catch(Exception e) { + fail("Unexpected error in testing write through proxy put"); + e.printStackTrace(); + } + waitForProxyPutsToDrain(redirectingStoreNode2); + assertEquals("Unexpected value in Node 2", + "write-through", + new String(socketStoreNode2.get(key2, null).get(0).getValue())); + assertFalse("Proxy write seen on proxy node which is a replica", + "write-through".equals(new String(socketStoreNode0.get(key2, null) + .get(0) + .getValue()))); + + // 3. If the same entry reaches Node 2 again from Node 0, via partition + // fetch, it will + // generate OVE. + try { + redirectingStoreNode2.put(key2, + Versioned.value("write-through".getBytes("UTF-8"), clock2), + null); + fail("Should have thrown OVE"); + } catch(ObsoleteVersionException ove) { + // Expected + } catch(Exception e) { + fail("Unexpected error in testing write through proxy put"); + e.printStackTrace(); + } + } }