diff --git a/test/common/voldemort/store/SleepyForceFailStore.java b/test/common/voldemort/store/SleepyForceFailStore.java deleted file mode 100644 index 81e9955bbf..0000000000 --- a/test/common/voldemort/store/SleepyForceFailStore.java +++ /dev/null @@ -1,60 +0,0 @@ -package voldemort.store; - -import java.util.List; -import java.util.Map; - -import voldemort.VoldemortException; -import voldemort.versioning.Version; -import voldemort.versioning.Versioned; - -public class SleepyForceFailStore extends ForceFailStore { - - private long sleepTimeMs; - - public SleepyForceFailStore(Store innerStore, VoldemortException e, long sleepTimeInMs) { - super(innerStore, e); - this.sleepTimeMs = sleepTimeInMs; - } - - @Override - public boolean delete(K key, Version version) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.delete(key, version); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public List> get(K key, T transforms) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.get(key, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public Map>> getAll(Iterable keys, Map transforms) - throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.getAll(keys, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public void put(K key, Versioned value, T transforms) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - super.put(key, value, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - -} diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java index bbeb4927e7..bc3458e533 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -17,7 +17,7 @@ package voldemort.store.routed; import static org.junit.Assert.fail; -import static voldemort.VoldemortTestConstants.getTwoNodeCluster; +import static voldemort.VoldemortTestConstants.getThreeNodeCluster; import java.util.Date; import java.util.List; @@ -29,8 +29,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; import org.junit.After; -import org.junit.Before; import org.junit.Test; import voldemort.ServerTestUtils; @@ -46,13 +46,15 @@ import voldemort.cluster.failuredetector.FailureDetectorUtils; import voldemort.cluster.failuredetector.MutableStoreVerifier; import voldemort.cluster.failuredetector.ThresholdFailureDetector; +import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; import voldemort.server.StoreRepository; import voldemort.server.scheduler.slop.StreamingSlopPusherJob; import voldemort.server.storage.ScanPermitWrapper; -import voldemort.store.SleepyForceFailStore; +import voldemort.store.ForceFailStore; +import voldemort.store.SleepyStore; import voldemort.store.StorageEngine; import voldemort.store.Store; import voldemort.store.StoreDefinition; @@ -90,23 +92,24 @@ */ public class HintedHandoffFailureTest { - private final static String STORE_NAME = "test"; private final static String SLOP_STORE_NAME = "slop"; - private final static int REPLICATION_FACTOR = 2; - private final static int P_READS = 1; - private final static int R_READS = 1; - private final static int P_WRITES = 2; - private final static int R_WRITES = 1; + private static int REPLICATION_FACTOR = 2; + private static int P_READS = 1; + private static int R_READS = 1; + private static int P_WRITES = 1; + private static int R_WRITES = 1; private static final int NUM_THREADS = 3; - private static final int NUM_NODES_TOTAL = 2; - private static final int FAILED_NODE_ID = 0; + private static final int NUM_NODES_TOTAL = 3; + private static int FAILED_NODE_ID = 0; + private final String STORE_NAME = "test"; private Cluster cluster; private FailureDetector failureDetector; private StoreDefinition storeDef; private ExecutorService routedStoreThreadPool; private RoutedStoreFactory routedStoreFactory; private RoutedStore store; + private RoutingStrategy strategy; private final static long routingTimeoutInMs = 1000; private final static long sleepBeforeFailingInMs = 2000; @@ -116,6 +119,8 @@ public class HintedHandoffFailureTest { private final Map> slopStores = new ConcurrentHashMap>(); private final List slopPusherJobs = Lists.newLinkedList(); + private final Logger logger = Logger.getLogger(getClass()); + private StoreDefinition getStoreDef(String storeName, int replicationFactor, int preads, @@ -154,24 +159,23 @@ private void setFailureDetector(Map> s } /** - * Setup a cluster with 2 nodes, with the following characteristics: + * Setup a cluster with 3 nodes, with the following characteristics: * - * - Node 0: Sleepy force failing store (will throw an exception after a - * delay) + * - 1st replica node: Sleepy force failing store (will throw an exception + * after a delay) * - * - Node 1: Standard In-memory store (wrapped by Logging store) + * - Pseudo master and other replicas: Standard In-memory store (wrapped by + * Logging store) * * - In memory slop stores * - * - A custom Put pipeline with a delay between parallel puts and doing the - * handoff + * @param key The ByteArray representation of the key * * @throws Exception */ - @Before - public void setUp() throws Exception { + public void customSetup(ByteArray key) throws Exception { - cluster = getTwoNodeCluster(); + cluster = getThreeNodeCluster(); storeDef = getStoreDef(STORE_NAME, REPLICATION_FACTOR, P_READS, @@ -180,28 +184,37 @@ public void setUp() throws Exception { R_WRITES, RoutingStrategyType.CONSISTENT_STRATEGY); - VoldemortException e = new UnreachableStoreException("Node down"); + strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); InMemoryStorageEngine inMemoryStorageEngine = new InMemoryStorageEngine(STORE_NAME); LoggingStore loggingStore = new LoggingStore(inMemoryStorageEngine); - // Set node 1 as a regular store - subStores.put(1, loggingStore); - - // Set node 0 as the force failing store - SleepyForceFailStore failureStore = new SleepyForceFailStore(loggingStore, - e, - sleepBeforeFailingInMs); + VoldemortException e = new UnreachableStoreException("Node down"); + ForceFailStore failureStore = new ForceFailStore(loggingStore, + e); + SleepyStore sleepyFailureStore = new SleepyStore(sleepBeforeFailingInMs, + failureStore); failureStore.setFail(true); - subStores.put(0, failureStore); + // Get the first replica node for the given key + // This will act as the sleepy failing node + Node failingNode = strategy.routeRequest(key.get()).get(1); + FAILED_NODE_ID = failingNode.getId(); + + subStores.clear(); + for(int i = 0; i < NUM_NODES_TOTAL; i++) { + if(i == FAILED_NODE_ID) { + subStores.put(i, sleepyFailureStore); + } else { + subStores.put(i, loggingStore); + } + } setFailureDetector(subStores); routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS); routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, new TimeoutConfig(routingTimeoutInMs, false)); - new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); Map nonblockingSlopStores = Maps.newHashMap(); for(Node node: cluster.getNodes()) { @@ -209,8 +222,9 @@ public void setUp() throws Exception { StoreRepository storeRepo = new StoreRepository(); storeRepo.addLocalStore(subStores.get(nodeId)); - for(int i = 0; i < NUM_NODES_TOTAL; i++) + for(int i = 0; i < NUM_NODES_TOTAL; i++) { storeRepo.addNodeStore(i, subStores.get(i)); + } SlopStorageEngine slopStorageEngine = new SlopStorageEngine(new InMemoryStorageEngine(SLOP_STORE_NAME), cluster); @@ -274,12 +288,12 @@ private Set makeSlopKeys(Set failedKeys) { for(ByteArray failedKey: failedKeys) { byte[] opCode = new byte[] { Slop.Operation.PUT.getOpCode() }; byte[] spacer = new byte[] { (byte) 0 }; - byte[] storeName = ByteUtils.getBytes(STORE_NAME, "UTF-8"); + byte[] storeNameBytes = ByteUtils.getBytes(STORE_NAME, "UTF-8"); byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT]; ByteUtils.writeInt(nodeIdBytes, FAILED_NODE_ID, 0); ByteArray slopKey = new ByteArray(ByteUtils.cat(opCode, spacer, - storeName, + storeNameBytes, spacer, nodeIdBytes, spacer, @@ -294,16 +308,30 @@ private Set makeSlopKeys(Set failedKeys) { * after PerformParallelPut has finished processing the responses and before * the hinted handoff actually begins, a slop is still registered for the * same. + * + * This is for the 2-1-1 configuration. */ @Test - public void testSlopOnDelayedFailingAsyncPut() { + public void testSlopOnDelayedFailingAsyncPut_2_1_1() { - // The following key will be routed to node 1 (pseudo master). We've set - // node 0 to be the sleepy failing node String key = "a"; String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 2; + P_READS = 1; + R_READS = 1; + P_WRITES = 1; + R_WRITES = 1; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + this.store.put(keyByteArray, versionedVal, null); // Check the slop stores @@ -317,12 +345,67 @@ public void testSlopOnDelayedFailingAsyncPut() { for(Map.Entry>> entry: res.entrySet()) { Slop slop = entry.getValue().get(0).getValue(); registeredSlops.add(slop); - System.out.println(slop); + logger.info(slop); } } if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); + } + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after PerformParallelPut has finished processing the responses and before + * the hinted handoff actually begins, a slop is still registered for the + * same. + * + * This is for the 3-2-2 configuration. + */ + @Test + public void testSlopOnDelayedFailingAsyncPut_3_2_2() { + + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 3; + P_READS = 2; + R_READS = 2; + P_WRITES = 2; + R_WRITES = 2; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + + this.store.put(keyByteArray, versionedVal, null); + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); } } @@ -330,17 +413,93 @@ public void testSlopOnDelayedFailingAsyncPut() { * Test to ensure that when an asynchronous put completes (with a failure) * after the pipeline completes, a slop is still registered (via a serial * hint). + * + * This is for the 2-1-1 configuration */ @Test - public void testSlopViaSerialHint() { + public void testSlopViaSerialHint_2_1_1() { - // The following key will be routed to node 1 (pseudo master). We've set - // node 0 to be the sleepy failing node String key = "a"; String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + // Set the correct replication config + REPLICATION_FACTOR = 2; + P_READS = 1; + R_READS = 1; + P_WRITES = 1; + R_WRITES = 1; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + + // We remove the delay in the pipeline so that the pipeline will finish + // before the failing async put returns. At this point it should do a + // serial hint. + delayBeforeHintedHandoff = 0; + + this.store.put(keyByteArray, versionedVal, null); + + // Give enough time for the serial hint to work. + try { + logger.info("Sleeping for 5 seconds to wait for the serial hint to finish"); + Thread.sleep(5000); + } catch(Exception e) {} + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); + } + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after the pipeline completes, a slop is still registered (via a serial + * hint). + * + * This is for the 3-2-2 configuration + */ + @Test + public void testSlopViaSerialHint_3_2_2() { + + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 3; + P_READS = 2; + R_READS = 2; + P_WRITES = 2; + R_WRITES = 2; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + // We remove the delay in the pipeline so that the pipeline will finish // before the failing async put returns. At this point it should do a // serial hint. @@ -350,7 +509,7 @@ public void testSlopViaSerialHint() { // Give enough time for the serial hint to work. try { - System.out.println("Sleeping for 5 seconds to wait for the serial hint to finish"); + logger.info("Sleeping for 5 seconds to wait for the serial hint to finish"); Thread.sleep(5000); } catch(Exception e) {} @@ -365,12 +524,14 @@ public void testSlopViaSerialHint() { for(Map.Entry>> entry: res.entrySet()) { Slop slop = entry.getValue().get(0).getValue(); registeredSlops.add(slop); - System.out.println(slop); + logger.info(slop); } } if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); } } @@ -390,10 +551,10 @@ protected DelayAction(PutPipelineData pipelineData, Event completeEvent, long sl @Override public void execute(Pipeline pipeline) { try { - System.out.println("Delayed pipeline action now sleeping for : " + sleepTimeInMs); + logger.info("Delayed pipeline action now sleeping for : " + sleepTimeInMs); Thread.sleep(sleepTimeInMs); - System.out.println("Now moving on to doing actual hinted handoff. Current time = " - + new Date(System.currentTimeMillis())); + logger.info("Now moving on to doing actual hinted handoff. Current time = " + + new Date(System.currentTimeMillis())); } catch(Exception e) {} pipeline.addEvent(completeEvent); }