diff --git a/src/java/voldemort/store/rebalancing/RedirectingStore.java b/src/java/voldemort/store/rebalancing/RedirectingStore.java index 4c5c0ad31a..9f73a4a47d 100644 --- a/src/java/voldemort/store/rebalancing/RedirectingStore.java +++ b/src/java/voldemort/store/rebalancing/RedirectingStore.java @@ -39,6 +39,7 @@ import voldemort.store.metadata.MetadataStore.VoldemortState; import voldemort.store.socket.SocketStoreFactory; import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; import voldemort.utils.Time; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.Version; @@ -107,7 +108,8 @@ private RebalancePartitionsInfo redirectingKey(ByteArray key) { @Override public List> get(ByteArray key, byte[] transforms) throws VoldemortException { RebalancePartitionsInfo stealInfo = redirectingKey(key); - + logger.info("BEGIN GET from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get())); /** * If I am rebalancing for this key, try to do remote get(), put it * locally first to get the correct version ignoring any @@ -119,14 +121,16 @@ public List> get(ByteArray key, byte[] transforms) throws Vold if(stealInfo != null) { proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms); } - + logger.info("END GET from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get())); return getInnerStore().get(key, transforms); } @Override public List getVersions(ByteArray key) { RebalancePartitionsInfo stealInfo = redirectingKey(key); - + logger.info("BEGIN GETVERSIONS from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get())); /** * If I am rebalancing for this key, try to do remote get(), put it * locally first to get the correct version ignoring any @@ -138,7 +142,8 @@ public List getVersions(ByteArray key) { if(stealInfo != null) { proxyGetAndLocalPut(key, stealInfo.getDonorId(), null); } - + logger.info("END GETVERSIONS from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get())); return getInnerStore().getVersions(key); } @@ -167,7 +172,8 @@ public Map>> getAll(Iterable keys, public void put(ByteArray key, Versioned value, byte[] transforms) throws VoldemortException { RebalancePartitionsInfo stealInfo = redirectingKey(key); - + logger.info("BEGIN PUT from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get()) + " value " + value); /** * If I am rebalancing for this key, try to do remote get() , put it * locally first to get the correct version ignoring any @@ -181,6 +187,8 @@ public void put(ByteArray key, Versioned value, byte[] transforms) proxyPut(key, value, transforms, stealInfo.getDonorId()); // TODO if I fail though for some reason, the aborting rebalance can // surface phantom writes + logger.info("END PUT from stealer:" + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get()) + " value " + value); getInnerStore().put(key, value, transforms); } @@ -245,6 +253,8 @@ private List> proxyGet(ByteArray key, int donorNodeId, byte[] private void proxyPut(ByteArray key, Versioned value, byte[] transforms, int donorNodeId) { Node donorNode = metadata.getCluster().getNodeById(donorNodeId); checkNodeAvailable(donorNode); + logger.info("BEGIN PROXY PUT for donor: " + donorNodeId + " from stealer:" + + metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get())); long startNs = System.nanoTime(); try { Store redirectingStore = getRedirectingSocketStore(getName(), @@ -254,7 +264,14 @@ private void proxyPut(ByteArray key, Versioned value, byte[] transforms, } catch(UnreachableStoreException e) { recordException(donorNode, startNs, e); throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e); + } catch(ObsoleteVersionException ove) { + logger.error("OVE in proxy put for donor: " + donorNodeId + " from stealer:" + + metadata.getNodeId() + " key " + + ByteUtils.toHexString(key.get()), + ove); } + logger.info("END PROXY PUT for donor: " + donorNodeId + " from stealer:" + + metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get())); } private void checkNodeAvailable(Node donorNode) { @@ -328,14 +345,18 @@ private List> proxyGetAndLocalPut(ByteArray key, int donorId, byte[] transforms) throws VoldemortException { + logger.info("BEGIN PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:" + + metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get())); List> proxyValues = proxyGet(key, donorId, transforms); for(Versioned proxyValue: proxyValues) { try { getInnerStore().put(key, proxyValue, null); } catch(ObsoleteVersionException e) { - // ignore these + logger.info("OVE in proxy get local put", e); } } + logger.info("END PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:" + + metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get())); return proxyValues; } diff --git a/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java b/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java index 6f1d15a1f0..fbc6888b4b 100644 --- a/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java +++ b/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java @@ -113,6 +113,9 @@ public List getNodes(ByteArray key, Operation op) { if(clientZoneNodes != null && clientZoneNodes.size() > 0) nodes.addAll(clientZoneNodes); // ...followed by other zones sorted by proximity list + // NOTE : its imperative that the proximity list does not contain the + // client zone. If this happens, we will add those nodes twice to the + // list for(int index = 0; index < zoneProximityList.size(); index++) { List zoneNodes = zoneIdToNode.get(zoneProximityList.get(index)); if(zoneNodes != null && zoneNodes.size() > 0) { diff --git a/test/common/voldemort/ServerTestUtils.java b/test/common/voldemort/ServerTestUtils.java index 06e3e57919..dc9330b441 100644 --- a/test/common/voldemort/ServerTestUtils.java +++ b/test/common/voldemort/ServerTestUtils.java @@ -347,7 +347,7 @@ public static Cluster getLocalZonedCluster(int numberOfNodes, for(int i = 0; i < numberOfZones; i++) { LinkedList proximityList = Lists.newLinkedList(); int zoneId = i + 1; - for(int j = 0; j < numberOfZones; j++) { + for(int j = 0; j < numberOfZones - 1; j++) { proximityList.add(zoneId % numberOfZones); zoneId++; } @@ -405,7 +405,7 @@ public static Cluster getLocalCluster(int numberOfNodes, for(int i = 0; i < numberOfZones; i++) { LinkedList proximityList = Lists.newLinkedList(); int zoneId = i + 1; - for(int j = 0; j < numberOfZones; j++) { + for(int j = 0; j < numberOfZones - 1; j++) { proximityList.add(zoneId % numberOfZones); zoneId++; } @@ -840,7 +840,6 @@ public static boolean waitForAsyncOperationOnServer(VoldemortServer server, matchingOperationIds = service.getMatchingAsyncOperationList(asyncOperationPattern, true); if(matchingOperationIds.size() > 0) { - System.err.println(">>" + matchingOperationIds); break; } } @@ -849,7 +848,6 @@ public static boolean waitForAsyncOperationOnServer(VoldemortServer server, List completedOps = new ArrayList(matchingOperationIds.size()); for(Integer op: matchingOperationIds) { if(service.isComplete(op)) { - System.err.println("Operation " + op + " is complete"); completedOps.add(op); } } diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index 0286dc180d..926280e162 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,11 +57,14 @@ import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; import voldemort.serialization.json.JsonReader; +import voldemort.server.VoldemortServer; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinitionBuilder; import voldemort.store.UnreachableStoreException; import voldemort.store.bdb.BdbStorageConfiguration; +import voldemort.store.metadata.MetadataStore; +import voldemort.store.metadata.MetadataStore.VoldemortState; import voldemort.store.readonly.JsonStoreBuilder; import voldemort.store.readonly.ReadOnlyStorageConfiguration; import voldemort.store.readonly.ReadOnlyStorageEngineTestInstance; @@ -69,7 +73,9 @@ import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.RebalanceUtils; +import voldemort.versioning.ClockEntry; import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; import voldemort.xml.StoreDefinitionsMapper; @@ -273,7 +279,7 @@ public void testRORWRebalanceWithReplication() throws Exception { // start servers 0 , 1 only List serverList = Arrays.asList(0, 1); Map configProps = new HashMap(); - configProps.put("admin.max.threads", "50"); + configProps.put("admin.max.threads", "5"); currentCluster = startServers(currentCluster, storeDefFileWithReplication, @@ -333,7 +339,7 @@ public void testRORebalanceWithReplication() throws Exception { // seems to only affect ThreadPoolBasedNonblockingStoreImpl tests rather // than Nio-based tests. Map configProps = new HashMap(); - configProps.put("admin.max.threads", "50"); + configProps.put("admin.max.threads", "5"); currentCluster = startServers(currentCluster, roStoreDefFileWithReplication, serverList, @@ -430,6 +436,7 @@ public void testRebalanceCleanPrimary() throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setDeleteAfterRebalancingEnabled(false); + config.setStealerBasedRebalancing(!useDonorBased); RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster, 0), config); @@ -506,6 +513,7 @@ public void testRebalanceCleanSecondary() throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setDeleteAfterRebalancingEnabled(false); + config.setStealerBasedRebalancing(!useDonorBased); RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster, 0), config); @@ -695,7 +703,7 @@ public void testProxyGetDuringRebalancing() throws Exception { // start servers 0 , 1 only final List serverList = Arrays.asList(0, 1); Map configProps = new HashMap(); - configProps.put("admin.max.threads", "50"); + configProps.put("admin.max.threads", "5"); final Cluster updatedCurrentCluster = startServers(currentCluster, storeDefFileWithReplication, serverList, @@ -708,6 +716,7 @@ public void testProxyGetDuringRebalancing() throws Exception { RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig(); rebalanceClientConfig.setMaxParallelRebalancing(2); + rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased); final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster, 0), @@ -782,6 +791,7 @@ public void run() { executors.execute(new Runnable() { + @Override public void run() { try { @@ -794,7 +804,6 @@ public void run() { Thread.sleep(500); rebalancingToken.set(true); checkConsistentMetadata(updatedTargetCluster, serverList); - } catch(Exception e) { exceptions.add(e); } finally { @@ -820,6 +829,194 @@ public void run() { } } + @Test(timeout = 600000) + public void testProxyPutDuringRebalancing() throws Exception { + logger.info("Starting testProxyPutDuringRebalancing"); + Cluster currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, + { 2 } }); + + Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(3)); + + // start servers 0,1,2 only + final List serverList = Arrays.asList(0, 1, 2); + Map configProps = new HashMap(); + configProps.put("admin.max.threads", "5"); + final Cluster updatedCurrentCluster = startServers(currentCluster, + rwStoreDefFileWithReplication, + serverList, + configProps); + final Cluster updatedTargetCluster = updateCluster(targetCluster); + + ExecutorService executors = Executors.newFixedThreadPool(2); + final AtomicBoolean rebalancingToken = new AtomicBoolean(false); + final List exceptions = Collections.synchronizedList(new ArrayList()); + + RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig(); + rebalanceClientConfig.setMaxParallelRebalancing(2); + rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased); + + final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster, + 0), + rebalanceClientConfig); + + populateData(updatedCurrentCluster, + rwStoreDefWithReplication, + rebalanceClient.getAdminClient(), + false); + + final AdminClient adminClient = rebalanceClient.getAdminClient(); + // the plan would cause these partitions to move + // Partition : Donor -> Stealer + // p2 (SEC) : 1 -> 0 + // p3 (PRI) : 1 -> 2 + final List movingKeysList = sampleKeysFromPartition(adminClient, + 1, + rwStoreDefWithReplication.getName(), + Arrays.asList(2, 3), + 20); + final AtomicBoolean rebalancingStarted = new AtomicBoolean(false); + final AtomicBoolean proxyWritesDone = new AtomicBoolean(false); + final HashMap baselineTuples = new HashMap(testEntries); + final HashMap baselineVersions = new HashMap(); + + for(String key: baselineTuples.keySet()) { + baselineVersions.put(key, new VectorClock()); + } + + // start get operation. + executors.execute(new Runnable() { + + @Override + public void run() { + SocketStoreClientFactory factory = null; + try { + // wait for the rebalancing to begin + // TODO also need to somehow test how live writes handle the + // metadata transition. + List serverList = Lists.newArrayList(serverMap.get(0), + serverMap.get(2)); + while(!rebalancingToken.get()) { + Iterator serverIterator = serverList.iterator(); + while(serverIterator.hasNext()) { + VoldemortServer server = serverIterator.next(); + if(ByteUtils.getString(server.getMetadataStore() + .get(MetadataStore.SERVER_STATE_KEY, null) + .get(0) + .getValue(), + "UTF-8") + .compareTo(VoldemortState.REBALANCING_MASTER_SERVER.toString()) == 0) { + serverIterator.remove(); + } + } + if(serverList.size() == 0) { + rebalancingStarted.set(true); + break; + } + } + factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(updatedCurrentCluster, + 0)) + .setEnableLazy(false) + .setSocketTimeout(120, + TimeUnit.SECONDS)); + + final StoreClient storeClientRW = new DefaultStoreClient(testStoreNameRW, + null, + factory, + 3); + + // Now perform some writes and determine the end state of + // the changed keys. Intially, all data now with zero vector + // clock + if(!rebalancingToken.get()) { + for(ByteArray movingKey: movingKeysList) { + String keyStr = ByteUtils.getString(movingKey.get(), "UTF-8"); + String valStr = "proxy_write"; + storeClientRW.put(keyStr, valStr); + baselineTuples.put(keyStr, valStr); + // all these keys will have [2:1] vector clock is + // node 2 is the pseudo master in both moves + baselineVersions.get(keyStr) + .incrementVersion(2, System.currentTimeMillis()); + proxyWritesDone.set(true); + if(rebalancingToken.get()) { + break; + } + } + } + } catch(Exception e) { + exceptions.add(e); + } finally { + if(factory != null) + factory.close(); + } + } + + }); + + executors.execute(new Runnable() { + + @Override + public void run() { + try { + rebalanceClient.rebalance(updatedTargetCluster); + Thread.sleep(500); + } catch(Exception e) { + logger.error("Error in rebalancing... ", e); + exceptions.add(e); + } finally { + rebalancingToken.set(true); + } + } + }); + + executors.shutdown(); + executors.awaitTermination(300, TimeUnit.SECONDS); + + assertEquals("Client did not see all server transition into rebalancing state", + rebalancingStarted.get(), + true); + assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true); + checkEntriesPostRebalance(updatedCurrentCluster, + updatedTargetCluster, + Lists.newArrayList(rwStoreDefWithReplication), + Arrays.asList(0, 1, 2), + baselineTuples, + baselineVersions); + checkConsistentMetadata(updatedTargetCluster, serverList); + // check No Exception + if(exceptions.size() > 0) { + + for(Exception e: exceptions) { + e.printStackTrace(); + } + fail("Should not see any exceptions."); + } + // check that the proxy writes were made to the original donor, node 1 + List clockEntries = new ArrayList(serverList.size()); + for(Integer nodeid: serverList) + clockEntries.add(new ClockEntry(nodeid.shortValue(), System.currentTimeMillis())); + VectorClock clusterXmlClock = new VectorClock(clockEntries, System.currentTimeMillis()); + for(Integer nodeid: serverList) + adminClient.metadataMgmtOps.updateRemoteCluster(nodeid, currentCluster, clusterXmlClock); + + adminClient.setAdminClientCluster(currentCluster); + checkForTupleEquivalence(adminClient, + 1, + testStoreNameRW, + movingKeysList, + baselineTuples, + baselineVersions); + + // stop servers + try { + stopServer(serverList); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + @Test(timeout = 600000) public void testServerSideRouting() throws Exception { logger.info("Starting testServerSideRouting"); diff --git a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java index 7937dbde6e..6c4877cef1 100644 --- a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java @@ -180,7 +180,20 @@ protected void rebalanceAndCheck(Cluster currentCluster, RebalanceController rebalanceClient, List nodeCheckList) { rebalanceClient.rebalance(targetCluster); + checkEntriesPostRebalance(currentCluster, + targetCluster, + storeDefs, + nodeCheckList, + testEntries, + null); + } + protected void checkEntriesPostRebalance(Cluster currentCluster, + Cluster targetCluster, + List storeDefs, + List nodeCheckList, + HashMap baselineTuples, + HashMap baselineVersions) { for(StoreDefinition storeDef: storeDefs) { Map>> currentNodeToPartitionTuples = RebalanceUtils.getNodeIdToAllPartitions(currentCluster, storeDef, @@ -204,20 +217,23 @@ protected void rebalanceAndCheck(Cluster currentCluster, targetCluster, storeDef, store, - flattenedPresentTuples); + flattenedPresentTuples, + baselineTuples, + baselineVersions); } } - } protected void checkGetEntries(Node node, Cluster cluster, StoreDefinition def, Store store, - HashMap> flattenedPresentTuples) { + HashMap> flattenedPresentTuples, + HashMap baselineTuples, + HashMap baselineVersions) { RoutingStrategy routing = new RoutingStrategyFactory().updateRoutingStrategy(def, cluster); - for(Entry entry: testEntries.entrySet()) { + for(Entry entry: baselineTuples.entrySet()) { ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8")); List partitions = routing.getPartitionList(keyBytes.get()); @@ -234,8 +250,18 @@ protected void checkGetEntries(Node node, } assertEquals("Expecting exactly one version", 1, values.size()); Versioned value = values.get(0); - // check version matches (expecting base version for all) - assertEquals("Value version should match", new VectorClock(), value.getVersion()); + // check version matches + if(baselineVersions == null) { + // expecting base version for all + assertEquals("Value version should match", + new VectorClock(), + value.getVersion()); + } else { + assertEquals("Value version should match", + baselineVersions.get(entry.getKey()), + value.getVersion()); + } + // check value matches. assertEquals("Value bytes should match", entry.getValue(), @@ -293,6 +319,46 @@ protected void checkForKeyExistence(AdminClient admin, } } + /** + * REFACTOR: these should belong AdminClient so existence checks can be done + * easily across the board + * + * @param admin + * @param serverId + * @param store + * @param keyList + */ + protected void checkForTupleEquivalence(AdminClient admin, + int serverId, + String store, + List keyList, + HashMap baselineTuples, + HashMap baselineVersions) { + // do the positive tests + Iterator>, Exception>>> positiveTestResultsItr = admin.storeOps.queryKeys(serverId, + store, + keyList.iterator()); + while(positiveTestResultsItr.hasNext()) { + Pair>, Exception>> item = positiveTestResultsItr.next(); + ByteArray key = item.getFirst(); + List> vals = item.getSecond().getFirst(); + Exception e = item.getSecond().getSecond(); + + assertEquals("Error fetching key " + key, null, e); + assertEquals("Value not found for key " + key, true, vals != null & vals.size() != 0); + + String keyStr = ByteUtils.getString(key.get(), "UTF-8"); + if(baselineTuples != null) + assertEquals("Value does not match up ", + baselineTuples.get(keyStr), + ByteUtils.getString(vals.get(0).getValue(), "UTF-8")); + if(baselineVersions != null) + assertEquals("Version does not match up", + baselineVersions.get(keyStr), + vals.get(0).getVersion()); + } + } + /** * REFACTOR: these should belong AdminClient so existence checks can be done * easily across the board diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index c4cbd1890c..8a8fa898d4 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -16,13 +16,24 @@ package voldemort.client.rebalance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.fail; + import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; @@ -31,7 +42,11 @@ import org.junit.Test; import voldemort.ServerTestUtils; +import voldemort.client.ClientConfig; +import voldemort.client.DefaultStoreClient; import voldemort.client.RoutingTier; +import voldemort.client.SocketStoreClientFactory; +import voldemort.client.StoreClient; import voldemort.client.protocol.admin.AdminClient; import voldemort.cluster.Cluster; import voldemort.cluster.Node; @@ -39,15 +54,20 @@ import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; +import voldemort.server.VoldemortServer; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinitionBuilder; import voldemort.store.bdb.BdbStorageConfiguration; +import voldemort.store.metadata.MetadataStore; +import voldemort.store.metadata.MetadataStore.VoldemortState; import voldemort.store.slop.strategy.HintedHandoffStrategyType; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.RebalanceUtils; +import voldemort.versioning.ClockEntry; import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; import voldemort.xml.StoreDefinitionsMapper; @@ -97,8 +117,8 @@ public void setUp() throws IOException { .setRequiredReads(1) .setPreferredWrites(1) .setRequiredWrites(1) - .setZoneCountReads(1) - .setZoneCountWrites(1) + .setZoneCountReads(0) + .setZoneCountWrites(0) .setZoneReplicationFactor(zrfRWStoreWithoutReplication) .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) .build(); @@ -124,8 +144,8 @@ public void setUp() throws IOException { .setRequiredReads(1) .setPreferredWrites(1) .setRequiredWrites(1) - .setZoneCountReads(1) - .setZoneCountWrites(1) + .setZoneCountReads(0) + .setZoneCountWrites(0) .setZoneReplicationFactor(zrfRWStoreWithReplication) .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) .build(); @@ -140,8 +160,8 @@ public void setUp() throws IOException { .setRequiredReads(1) .setPreferredWrites(1) .setRequiredWrites(1) - .setZoneCountReads(1) - .setZoneCountWrites(1) + .setZoneCountReads(0) + .setZoneCountWrites(0) .setZoneReplicationFactor(zrfRWStoreWithReplication) .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) .build(); @@ -192,7 +212,7 @@ public void testRWRebalance() throws Exception { // start all the servers List serverList = Arrays.asList(0, 1, 2, 3); Map configProps = new HashMap(); - configProps.put("admin.max.threads", "50"); + configProps.put("admin.max.threads", "5"); currentCluster = startServers(currentCluster, storeDefFileWithoutReplication, serverList, @@ -202,6 +222,7 @@ public void testRWRebalance() throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setDeleteAfterRebalancingEnabled(true); + config.setStealerBasedRebalancing(!useDonorBased); RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster, 0), config); @@ -221,8 +242,7 @@ public void testRWRebalance() throws Exception { } } - @Test(timeout = 600000) - public void testRWRebalanceWithReplication() throws Exception { + public void testRWRebalanceWithReplication(boolean serial) throws Exception { logger.info("Starting testRWRebalanceWithReplication"); Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, @@ -239,8 +259,9 @@ public void testRWRebalanceWithReplication() throws Exception { // start servers List serverList = Arrays.asList(0, 1, 2, 3); Map configProps = new HashMap(); - configProps.put("admin.max.threads", "50"); - + configProps.put("admin.max.threads", "5"); + if(serial) + configProps.put("max.parallel.stores.rebalancing", String.valueOf(1)); currentCluster = startServers(currentCluster, storeDefFileWithReplication, serverList, @@ -250,6 +271,9 @@ public void testRWRebalanceWithReplication() throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setDeleteAfterRebalancingEnabled(true); + config.setStealerBasedRebalancing(!useDonorBased); + config.setPrimaryPartitionBatchSize(100); + config.setMaxParallelRebalancing(5); RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster, 0), config); @@ -269,6 +293,16 @@ public void testRWRebalanceWithReplication() throws Exception { } } + @Test(timeout = 600000) + public void testRWRebalanceWithReplication() throws Exception { + testRWRebalanceWithReplication(false); + } + + @Test(timeout = 600000) + public void testRWRebalanceWithReplicationSerial() throws Exception { + testRWRebalanceWithReplication(true); + } + @Test(timeout = 600000) public void testRebalanceCleanPrimarySecondary() throws Exception { logger.info("Starting testRebalanceCleanPrimary"); @@ -279,6 +313,18 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { Lists.newArrayList(7)); targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, 5, Lists.newArrayList(6)); + /** + * original server partition ownership + * + * [s0 : p0,p3,p4,p5,p6,p7] [s1 : p1-p7] [s2 : p1,p2] [s3 : + * p0,p1,p2,p3,p6,p7] [s4 : p1-p7] [s5 : p4,p5] + * + * target server partition ownership + * + * [s0 : p0,p2,p3,p4,p5,p6,p7] [s1 : p0,p1] [s2 : p1-p7] [s3 : + * p0.p1,p2,p3,p5,p6,p7] [s4 : p0,p1,p2,p3,p4,p7] [s5 : p4,p5,p6] + */ + // start servers List serverList = Arrays.asList(0, 1, 2, 3, 4, 5); Map configProps = new HashMap(); @@ -292,6 +338,7 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setDeleteAfterRebalancingEnabled(false); + config.setStealerBasedRebalancing(!useDonorBased); RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster, 0), config); @@ -369,6 +416,329 @@ public void testRebalanceCleanPrimarySecondary() throws Exception { } } + @Test(timeout = 600000) + public void testProxyGetDuringRebalancing() throws Exception { + logger.info("Starting testProxyGetDuringRebalancing"); + Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, + 2, + new int[] { 0, 0, 1, 1 }, + new int[][] { { 0, 2, 4 }, + { 6 }, { 1, 3, 5 }, + { 7 } }); + Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 3, + Lists.newArrayList(2)); + targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, 1, Lists.newArrayList(3)); + + final List serverList = Arrays.asList(0, 1, 2, 3); + Map configProps = new HashMap(); + configProps.put("admin.max.threads", "5"); + final Cluster updatedCurrentCluster = startServers(currentCluster, + storeDefFileWithReplication, + serverList, + configProps); + // Update the cluster information based on the node information + final Cluster updatedTargetCluster = updateCluster(targetCluster); + + ExecutorService executors = Executors.newFixedThreadPool(2); + final AtomicBoolean rebalancingToken = new AtomicBoolean(false); + final List exceptions = Collections.synchronizedList(new ArrayList()); + + RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig(); + rebalanceClientConfig.setMaxParallelRebalancing(2); + rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased); + + final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster, + 0), + rebalanceClientConfig); + try { + + populateData(currentCluster, rwStoreDefWithReplication); + + final SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(currentCluster, + 0)) + .setEnableLazy(false) + .setSocketTimeout(120, + TimeUnit.SECONDS)); + + final StoreClient storeClientRW = new DefaultStoreClient(rwStoreDefWithReplication.getName(), + null, + factory, + 3); + + // start get operation. + executors.execute(new Runnable() { + + public void run() { + try { + List keys = new ArrayList(testEntries.keySet()); + + while(!rebalancingToken.get()) { + // should always able to get values. + int index = (int) (Math.random() * keys.size()); + + // should get a valid value + try { + Versioned value = storeClientRW.get(keys.get(index)); + assertNotSame("StoreClient get() should not return null.", + null, + value); + assertEquals("Value returned should be good", + new Versioned(testEntries.get(keys.get(index))), + value); + } catch(Exception e) { + e.printStackTrace(); + exceptions.add(e); + } + } + + } catch(Exception e) { + exceptions.add(e); + } finally { + factory.close(); + } + } + + }); + + executors.execute(new Runnable() { + + public void run() { + try { + + Thread.sleep(500); + rebalanceAndCheck(updatedCurrentCluster, + updatedTargetCluster, + storeDefWithReplication, + rebalanceClient, + Arrays.asList(0, 1, 2, 3)); + Thread.sleep(500); + rebalancingToken.set(true); + checkConsistentMetadata(updatedTargetCluster, serverList); + + } catch(Exception e) { + exceptions.add(e); + } finally { + // stop servers + try { + stopServer(serverList); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + } + }); + + executors.shutdown(); + executors.awaitTermination(300, TimeUnit.SECONDS); + + // check No Exception + if(exceptions.size() > 0) { + for(Exception e: exceptions) { + e.printStackTrace(); + } + fail("Should not see any exceptions."); + } + } finally { + // stop servers + stopServer(serverList); + } + } + + @Test(timeout = 600000) + public void testProxyPutDuringRebalancing() throws Exception { + logger.info("Starting testProxyPutDuringRebalancing"); + Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(6, 2, new int[] { 0, 0, 0, 1, + 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } }); + Cluster targetCluster = RebalanceUtils.createUpdatedCluster(currentCluster, + 2, + Lists.newArrayList(7)); + targetCluster = RebalanceUtils.createUpdatedCluster(targetCluster, 5, Lists.newArrayList(6)); + + /** + * original server partition ownership + * + * [s0 : p0,p3,p4,p5,p6,p7] [s1 : p1-p7] [s2 : p1,p2] [s3 : + * p0,p1,p2,p3,p6,p7] [s4 : p1-p7] [s5 : p4,p5] + * + * target server partition ownership + * + * [s0 : p0,p2,p3,p4,p5,p6,p7] [s1 : p0,p1] [s2 : p1-p7] [s3 : + * p0.p1,p2,p3,p5,p6,p7] [s4 : p0,p1,p2,p3,p4,p7] [s5 : p4,p5,p6] + */ + List serverList = Arrays.asList(0, 1, 2, 3, 4, 5); + Map configProps = new HashMap(); + configProps.put("admin.max.threads", "5"); + final Cluster updatedCurrentCluster = startServers(currentCluster, + rwStoreDefFileWithReplication, + serverList, + configProps); + // Update the cluster information based on the node information + final Cluster updatedTargetCluster = updateCluster(targetCluster); + + ExecutorService executors = Executors.newFixedThreadPool(2); + final AtomicBoolean rebalancingToken = new AtomicBoolean(false); + final List exceptions = Collections.synchronizedList(new ArrayList()); + + RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig(); + rebalanceClientConfig.setMaxParallelRebalancing(2); + rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased); + + final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster, + 0), + rebalanceClientConfig); + + populateData(currentCluster, rwStoreDefWithReplication); + final AdminClient adminClient = rebalanceClient.getAdminClient(); + // the plan would cause the following cross zone move + // Partition : Donor -> Stealer + // p6 (PRI) : 1 -> 5 + final List movingKeysList = sampleKeysFromPartition(adminClient, + 1, + rwStoreDefWithReplication.getName(), + Arrays.asList(6), + 20); + final AtomicBoolean rebalancingStarted = new AtomicBoolean(false); + final HashMap baselineTuples = new HashMap(testEntries); + final HashMap baselineVersions = new HashMap(); + + for(String key: baselineTuples.keySet()) { + baselineVersions.put(key, new VectorClock()); + } + + // start get operation. + executors.execute(new Runnable() { + + @Override + public void run() { + SocketStoreClientFactory factory = null; + try { + // wait for the rebalancing to begin + // TODO also need to somehow test how live writes handle the + // metadata transition. + List serverList = Lists.newArrayList(serverMap.get(4), + serverMap.get(2), + serverMap.get(3), + serverMap.get(5)); + while(!rebalancingToken.get()) { + Iterator serverIterator = serverList.iterator(); + while(serverIterator.hasNext()) { + VoldemortServer server = serverIterator.next(); + if(ByteUtils.getString(server.getMetadataStore() + .get(MetadataStore.SERVER_STATE_KEY, null) + .get(0) + .getValue(), + "UTF-8") + .compareTo(VoldemortState.REBALANCING_MASTER_SERVER.toString()) == 0) { + serverIterator.remove(); + } + } + if(serverList.size() == 0) { + rebalancingStarted.set(true); + break; + } + } + factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(updatedCurrentCluster, + 0)) + .setEnableLazy(false) + .setSocketTimeout(120, + TimeUnit.SECONDS) + .setClientZoneId(1)); + + final StoreClient storeClientRW = new DefaultStoreClient(testStoreNameRW, + null, + factory, + 3); + // Now perform some writes and determine the end state of + // the changed keys. Intially, all data now with zero vector + // clock + for(ByteArray movingKey: movingKeysList) { + try { + String keyStr = ByteUtils.getString(movingKey.get(), "UTF-8"); + String valStr = "proxy_write"; + storeClientRW.put(keyStr, valStr); + baselineTuples.put(keyStr, valStr); + // all these keys will have [5:1] vector clock is + // node 5 is the new pseudo master + baselineVersions.get(keyStr) + .incrementVersion(5, System.currentTimeMillis()); + if(rebalancingToken.get()) { + break; + } + } catch(Exception e) { + e.printStackTrace(); + } + } + } catch(Exception e) { + exceptions.add(e); + } finally { + if(factory != null) + factory.close(); + } + } + + }); + + executors.execute(new Runnable() { + + @Override + public void run() { + try { + rebalanceClient.rebalance(updatedTargetCluster); + Thread.sleep(500); + } catch(Exception e) { + logger.error("Error in rebalancing... ", e); + exceptions.add(e); + } finally { + rebalancingToken.set(true); + + } + } + }); + + executors.shutdown(); + executors.awaitTermination(300, TimeUnit.SECONDS); + + assertEquals("Client did not see all server transition into rebalancing state", + rebalancingStarted.get(), + true); + checkEntriesPostRebalance(updatedCurrentCluster, + updatedTargetCluster, + Lists.newArrayList(rwStoreDefWithReplication), + Arrays.asList(0, 1, 2, 3, 4, 5), + baselineTuples, + baselineVersions); + checkConsistentMetadata(updatedTargetCluster, serverList); + // check No Exception + if(exceptions.size() > 0) { + for(Exception e: exceptions) { + e.printStackTrace(); + } + fail("Should not see any exceptions."); + } + // check that the proxy writes were made to the original donor, node 1 + List clockEntries = new ArrayList(serverList.size()); + for(Integer nodeid: serverList) + clockEntries.add(new ClockEntry(nodeid.shortValue(), System.currentTimeMillis())); + VectorClock clusterXmlClock = new VectorClock(clockEntries, System.currentTimeMillis()); + for(Integer nodeid: serverList) + adminClient.metadataMgmtOps.updateRemoteCluster(nodeid, currentCluster, clusterXmlClock); + adminClient.setAdminClientCluster(currentCluster); + checkForTupleEquivalence(adminClient, + 1, + testStoreNameRW, + movingKeysList, + baselineTuples, + baselineVersions); + + // stop servers + try { + stopServer(serverList); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + protected void populateData(Cluster cluster, StoreDefinition storeDef) throws Exception { // Create SocketStores for each Node first @@ -376,11 +746,6 @@ protected void populateData(Cluster cluster, StoreDefinition storeDef) throws Ex for(Node node: cluster.getNodes()) { storeMap.put(node.getId(), getSocketStore(storeDef.getName(), node.getHost(), node.getSocketPort())); - System.err.printf("%d,%s,%s,%s\n", - node.getId(), - storeDef.getName(), - node.getSocketPort(), - node.getAdminPort()); } RoutingStrategy routing = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, @@ -388,9 +753,6 @@ protected void populateData(Cluster cluster, StoreDefinition storeDef) throws Ex for(Entry entry: testEntries.entrySet()) { ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8")); List preferenceNodes = RebalanceUtils.getNodeIds(routing.routeRequest(keyBytes.get())); - - System.err.println(preferenceNodes); - // Go over every node for(int nodeId: preferenceNodes) { try {