From 560bd67828e328631ffec002b66adad7e88d95fb Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 10 Apr 2013 18:41:30 -0700 Subject: [PATCH] Adding unit test for fork lift tool --- .../utils/AbstractConsistencyFixer.java | 10 + .../voldemort/utils/ClusterForkLiftTool.java | 56 ++-- .../config/two-stores-replicated.xml | 39 +++ .../utils/ClusterForkLiftToolTest.java | 302 ++++++++++++++++++ 4 files changed, 386 insertions(+), 21 deletions(-) create mode 100644 test/common/voldemort/config/two-stores-replicated.xml create mode 100644 test/unit/voldemort/utils/ClusterForkLiftToolTest.java diff --git a/src/java/voldemort/utils/AbstractConsistencyFixer.java b/src/java/voldemort/utils/AbstractConsistencyFixer.java index b14da40e3a..c13dc1529a 100644 --- a/src/java/voldemort/utils/AbstractConsistencyFixer.java +++ b/src/java/voldemort/utils/AbstractConsistencyFixer.java @@ -35,6 +35,16 @@ import com.google.common.collect.Lists; +/** + * Base class that contains logic to read the state of a key on a cluster and + * resolve it using read repair + * + * NOTE : For now, there is only one subclass extending this class, to perform + * Consistency fixing by writing the resolved versions back to the cluster. Any + * future tool that needs similar functionality can extend this class and + * implement logic to do whatever it wants to do with the resolved versions. + * + */ abstract class AbstractConsistencyFixer { private static final Logger logger = Logger.getLogger(AbstractConsistencyFixer.class); diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java index 827a859e41..67a7e42538 100644 --- a/src/java/voldemort/utils/ClusterForkLiftTool.java +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -81,6 +81,29 @@ * forklift window. Of course, after the forklift window, the destination * cluster resumes normal operation. * + * 3) For now, we will fallback to fetching the key from the primary replica, + * fetch the values out manually, resolve and write it back. PitFalls : primary + * somehow does not have the key. + * + * Two scenarios. + * + * 1) Key active after double writes: the situation is the result of slop not + * propagating to the primary. But double writes would write the key back to + * destination cluster anyway. We are good. + * + * 2) Key inactive after double writes: This indicates a problem elsewhere. This + * is a base guarantee voldemort should offer. + * + * 4) Zoned <-> Non Zoned forklift implications. + * + * When forklifting data from a non-zoned to zoned cluster, both destination + * zones will be populated with data, by simply running the tool once with the + * respective bootstrap urls. If you need to forklift data from zoned to + * non-zoned clusters (i.e your replication between datacenters is not handled + * by Voldemort), then you need to run the tool twice for each destination + * non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are + * trivial. + * */ public class ClusterForkLiftTool implements Runnable { @@ -113,10 +136,10 @@ public ClusterForkLiftTool(String srcBootstrapUrl, new ClientConfig()); // set up streaming client to the destination cluster - Props property = new Props(); - property.put("streaming.platform.bootstrapURL", dstBootstrapUrl); - property.put("streaming.platform.throttle.qps", maxPutsPerSecond); - StreamingClientConfig config = new StreamingClientConfig(property); + Props props = new Props(); + props.put("streaming.platform.bootstrapURL", dstBootstrapUrl); + props.put("streaming.platform.throttle.qps", maxPutsPerSecond); + StreamingClientConfig config = new StreamingClientConfig(props); this.dstStreamingClient = new StreamingClient(config); // determine and verify final list of stores to be forklifted over @@ -173,6 +196,12 @@ private HashMap checkStoresOnBothSides() { return srcStoreDefMap; } + /** + * TODO this base class can potentially provide some framework of execution + * for the subclasses, to yield a better objected oriented design (progress + * tracking etc) + * + */ abstract class SinglePartitionForkLiftTask { protected int partitionId; @@ -209,21 +238,6 @@ class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLi super(storeInstance, partitionId, latch); } - /** - * For now, we will fallback to fetching the key from the primary - * replica, fetch the values out manually, resolve and write it back. - * PitFalls : primary somehow does not have the key. - * - * Two scenarios. - * - * 1) Key active after double writes: the situation is the result of - * slop not propagating to the primary. But double writes would write - * the key back to destination cluster anyway. We are good. - * - * 2) Key inactive after double writes: This indicates a problem - * elsewhere. This is a base guarantee voldemort should offer. - * - */ public void run() { String storeName = this.storeInstance.getStoreDefinition().getName(); long entriesForkLifted = 0; @@ -455,8 +469,6 @@ public Object call() throws Exception { } srcAdminClient.close(); dstStreamingClient.getAdminClient().close(); - // TODO cleanly shut down the threadpool - System.exit(0); } } @@ -570,5 +582,7 @@ public static void main(String[] args) throws Exception { partitions, options.has("global-resolution")); forkLiftTool.run(); + // TODO cleanly shut down the hanging threadpool + System.exit(0); } } diff --git a/test/common/voldemort/config/two-stores-replicated.xml b/test/common/voldemort/config/two-stores-replicated.xml new file mode 100644 index 0000000000..3e15423fc0 --- /dev/null +++ b/test/common/voldemort/config/two-stores-replicated.xml @@ -0,0 +1,39 @@ + + + + test + bdb + client + 2 + 1 + 1 + 1 + 1 + + string + UTF-8 + + + string + UTF-8 + + + + best + bdb + client + 3 + 2 + 2 + 2 + 2 + + string + UTF-8 + + + string + UTF-8 + + + diff --git a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java new file mode 100644 index 0000000000..be59902376 --- /dev/null +++ b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java @@ -0,0 +1,302 @@ +package voldemort.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.client.ClientConfig; +import voldemort.client.SocketStoreClientFactory; +import voldemort.client.StoreClient; +import voldemort.client.StoreClientFactory; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.server.VoldemortServer; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreUtils; +import voldemort.store.routed.NodeValue; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +import voldemort.versioning.ClockEntry; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; +import voldemort.xml.StoreDefinitionsMapper; + +import com.google.common.collect.Lists; + +public class ClusterForkLiftToolTest { + + final static String STORES_XML = "test/common/voldemort/config/two-stores-replicated.xml"; + final static String PRIMARY_RESOLVING_STORE_NAME = "test"; + final static String GLOBALLY_RESOLVING_STORE_NAME = "best"; + + private String srcBootStrapUrl; + private String dstBootStrapUrl; + private Cluster srcCluster; + private Cluster dstCluster; + private VoldemortServer[] srcServers; + private VoldemortServer[] dstServers; + + private StoreDefinition primaryResolvingStoreDef; + private StoreDefinition globallyResolvingStoreDef; + + private HashMap kvPairs; + private String firstKey; + private String lastKey; + private String conflictKey; + + private AdminClient srcAdminClient; + + private StoreClientFactory srcfactory; + private StoreClientFactory dstfactory; + private StoreClient srcPrimaryResolvingStoreClient; + private StoreClient dstPrimaryResolvingStoreClient; + private StoreClient srcGloballyResolvingStoreClient; + private StoreClient dstGloballyResolvingStoreClient; + + @Before + public void setUpClusters() { + // setup four nodes with one store and one partition + final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, + 10000, + 100000, + 32 * 1024); + + try { + int srcPartitionMap[][] = { { 0 }, { 1 }, { 2 }, { 3 } }; + srcServers = new VoldemortServer[4]; + srcCluster = ServerTestUtils.startVoldemortCluster(4, + srcServers, + srcPartitionMap, + socketStoreFactory, + true, + null, + STORES_XML, + new Properties()); + Node node = srcCluster.getNodeById(0); + srcBootStrapUrl = "tcp://" + node.getHost() + ":" + node.getSocketPort(); + + int dstPartitionMap[][] = { { 0 }, { 1 }, { 2 } }; + dstServers = new VoldemortServer[3]; + dstCluster = ServerTestUtils.startVoldemortCluster(3, + dstServers, + dstPartitionMap, + socketStoreFactory, + true, + null, + STORES_XML, + new Properties()); + node = dstCluster.getNodeById(0); + dstBootStrapUrl = "tcp://" + node.getHost() + ":" + node.getSocketPort(); + + kvPairs = ServerTestUtils.createRandomKeyValueString(100); + int keyCount = 0; + for(String key: kvPairs.keySet()) { + if(keyCount == 0) + firstKey = key; + if(keyCount == kvPairs.size() - 1) + lastKey = key; + if(keyCount == kvPairs.size() / 2) + conflictKey = key; + keyCount++; + } + + srcAdminClient = new AdminClient(srcCluster, + new AdminClientConfig(), + new ClientConfig()); + + List storeDefs = new StoreDefinitionsMapper().readStoreList(new File(STORES_XML)); + + primaryResolvingStoreDef = StoreUtils.getStoreDef(storeDefs, + PRIMARY_RESOLVING_STORE_NAME); + globallyResolvingStoreDef = StoreUtils.getStoreDef(storeDefs, + GLOBALLY_RESOLVING_STORE_NAME); + + srcfactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(srcBootStrapUrl) + .setSelectors(1) + .setRoutingTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setSocketTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setConnectionTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setMaxConnectionsPerNode(1)); + srcPrimaryResolvingStoreClient = srcfactory.getStoreClient(PRIMARY_RESOLVING_STORE_NAME); + srcGloballyResolvingStoreClient = srcfactory.getStoreClient(GLOBALLY_RESOLVING_STORE_NAME); + + dstfactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(dstBootStrapUrl) + .setSelectors(1) + .setRoutingTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setSocketTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setConnectionTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setMaxConnectionsPerNode(1)); + dstPrimaryResolvingStoreClient = dstfactory.getStoreClient(PRIMARY_RESOLVING_STORE_NAME); + dstGloballyResolvingStoreClient = dstfactory.getStoreClient(GLOBALLY_RESOLVING_STORE_NAME); + + } catch(IOException e) { + e.printStackTrace(); + fail("Unexpected exception"); + } + } + + @Test + public void testPrimaryResolvingForkLift() throws Exception { + + StoreInstance srcStoreInstance = new StoreInstance(srcCluster, primaryResolvingStoreDef); + + // populate data on the source cluster.. + for(Map.Entry entry: kvPairs.entrySet()) { + srcPrimaryResolvingStoreClient.put(entry.getKey(), entry.getValue()); + } + + // generate a conflict on the master partition + int masterNode = srcStoreInstance.getNodeIdForPartitionId(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8"))); + VectorClock losingClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 0, 5)), + System.currentTimeMillis()); + VectorClock winningClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, 5)), + losingClock.getTimestamp() + 1); + srcAdminClient.storeOps.putNodeKeyValue(PRIMARY_RESOLVING_STORE_NAME, + new NodeValue(masterNode, + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("losing value".getBytes("UTF-8"), + losingClock))); + srcAdminClient.storeOps.putNodeKeyValue(PRIMARY_RESOLVING_STORE_NAME, + new NodeValue(masterNode, + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("winning value".getBytes("UTF-8"), + winningClock))); + + // do a write to destination cluster + dstPrimaryResolvingStoreClient.put(firstKey, "before forklift"); + + // perform the forklifting.. + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, + dstBootStrapUrl, + 10000, + 1, + 1000, + Lists.newArrayList(PRIMARY_RESOLVING_STORE_NAME), + null, + false); + forkLiftTool.run(); + + // do a write to destination cluster + dstPrimaryResolvingStoreClient.put(lastKey, "after forklift"); + + // verify data on the destination is as expected + for(Map.Entry entry: kvPairs.entrySet()) { + if(entry.getKey().equals(firstKey)) { + assertEquals("Online write overwritten", + dstPrimaryResolvingStoreClient.get(firstKey).getValue(), + "before forklift"); + } else if(entry.getKey().equals(lastKey)) { + assertEquals("Online write overwritten", + dstPrimaryResolvingStoreClient.get(lastKey).getValue(), + "after forklift"); + } else if(entry.getKey().equals(conflictKey)) { + assertEquals("Conflict resolution incorrect", + dstPrimaryResolvingStoreClient.get(conflictKey).getValue(), + "winning value"); + } else { + assertEquals("fork lift data missing", + dstPrimaryResolvingStoreClient.get(entry.getKey()).getValue(), + entry.getValue()); + } + } + } + + @Test + public void testGloballyResolvingForkLift() throws Exception { + + StoreInstance srcStoreInstance = new StoreInstance(srcCluster, globallyResolvingStoreDef); + + // populate data on the source cluster.. + for(Map.Entry entry: kvPairs.entrySet()) { + srcGloballyResolvingStoreClient.put(entry.getKey(), entry.getValue()); + } + + // generate a conflict on the primary and a secondary + List nodeList = srcStoreInstance.getReplicationNodeList(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8"))); + VectorClock losingClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 0, 5)), + System.currentTimeMillis()); + VectorClock winningClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, 5)), + losingClock.getTimestamp() + 1); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(0), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("losing value".getBytes("UTF-8"), + losingClock))); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(1), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("winning value".getBytes("UTF-8"), + winningClock))); + + // do a write to destination cluster + dstGloballyResolvingStoreClient.put(firstKey, "before forklift"); + + // perform the forklifting.. + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, + dstBootStrapUrl, + 10000, + 1, + 1000, + Lists.newArrayList(GLOBALLY_RESOLVING_STORE_NAME), + null, + true); + forkLiftTool.run(); + + // do a write to destination cluster + dstGloballyResolvingStoreClient.put(lastKey, "after forklift"); + + // verify data on the destination is as expected + for(Map.Entry entry: kvPairs.entrySet()) { + if(entry.getKey().equals(firstKey)) { + assertEquals("Online write overwritten", + dstGloballyResolvingStoreClient.get(firstKey).getValue(), + "before forklift"); + } else if(entry.getKey().equals(lastKey)) { + assertEquals("Online write overwritten", + dstGloballyResolvingStoreClient.get(lastKey).getValue(), + "after forklift"); + } else if(entry.getKey().equals(conflictKey)) { + assertEquals("Conflict resolution incorrect", + dstGloballyResolvingStoreClient.get(conflictKey).getValue(), + "winning value"); + } else { + assertEquals("fork lift data missing", + dstGloballyResolvingStoreClient.get(entry.getKey()).getValue(), + entry.getValue()); + } + } + } + + @After + public void tearDownClusters() { + + srcAdminClient.close(); + + srcfactory.close(); + dstfactory.close(); + + for(VoldemortServer server: srcServers) + server.stop(); + for(VoldemortServer server: dstServers) + server.stop(); + } +}