From be4dbc00921474334371266ff18856dc9ec84bee Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Tue, 30 Apr 2013 17:29:12 -0700 Subject: [PATCH] Fixing the faulty read-repair logic (removed duplicates, removed unnecessary repairs). Added a test case to verify this in the presence of concurrent versions --- .../voldemort/store/routed/ReadRepairer.java | 121 +++++++++++------- .../routed/action/AbstractReadRepair.java | 2 +- .../store/routed/ReadRepairerTest.java | 10 ++ 3 files changed, 84 insertions(+), 49 deletions(-) diff --git a/src/java/voldemort/store/routed/ReadRepairer.java b/src/java/voldemort/store/routed/ReadRepairer.java index 32b5156c32..a5be02a2ef 100644 --- a/src/java/voldemort/store/routed/ReadRepairer.java +++ b/src/java/voldemort/store/routed/ReadRepairer.java @@ -18,11 +18,14 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.log4j.Logger; + import voldemort.annotations.concurrency.Threadsafe; import voldemort.versioning.Occurred; import voldemort.versioning.Version; @@ -45,6 +48,8 @@ @Threadsafe public class ReadRepairer { + private final Logger logger = Logger.getLogger(getClass()); + /** * Compute the repair set from the given values and nodes * @@ -77,85 +82,105 @@ private List> singleKeyGetRepairs(List> nodeValu if(size <= 1) return Collections.emptyList(); - // A list of obsolete nodes that need to be repaired - Set obsolete = new HashSet(3); + // 1. Create a multi-map of nodes to their existing Versions + Multimap> nodeVersionsMap = HashMultimap.create(); + for(NodeValue nodeValue: nodeValues) { + nodeVersionsMap.put(nodeValue.getNodeId(), nodeValue); + } + + // 2. Create a map of the final set of versions (for this key) + Map> mostCurrentVersionsMap = new HashMap>(); - // A Map of Version=>NodeValues that contains the current best estimate - // of the set of current versions - // and the nodes containing them - Multimap> concurrents = HashMultimap.create(); - concurrents.put(nodeValues.get(0).getVersion(), nodeValues.get(0)); + // Initialize with the first element from the input + mostCurrentVersionsMap.put(nodeValues.get(0).getVersion(), nodeValues.get(0)); // check each value against the current set of most current versions for(int i = 1; i < nodeValues.size(); i++) { NodeValue curr = nodeValues.get(i); boolean concurrentToAll = true; - Set versions = new HashSet(concurrents.keySet()); - for(Version concurrentVersion: versions) { - // if we already have the version, just add the nodevalue for - // future updating and move on - if(curr.getVersion().equals(concurrentVersion)) { - concurrents.put(curr.getVersion(), curr); + /* + * Make a copy for the traversal. This is because the original map + * can be modified during this traversal + */ + Set knownGoodVersions = new HashSet(mostCurrentVersionsMap.keySet()); + + for(Version currentGoodversion: knownGoodVersions) { + + // If the version already exists, do nothing + if(curr.getVersion().equals(currentGoodversion)) { + concurrentToAll = false; + if(logger.isDebugEnabled()) { + logger.debug("Version already exists in the most current set: " + curr); + } break; } // Check the ordering of the current value - Occurred occurred = curr.getVersion().compare(concurrentVersion); + Occurred occurred = curr.getVersion().compare(currentGoodversion); if(occurred == Occurred.BEFORE) { - // This value is obsolete! Stop checking against other - // values... - obsolete.add(curr.getNodeId()); + // This value is obsolete! Break from the loop + if(logger.isDebugEnabled()) { + logger.debug("Version is obsolete : " + curr); + } concurrentToAll = false; break; } else if(occurred == Occurred.AFTER) { // This concurrent value is obsolete and the current value // should replace it - for(NodeValue v: concurrents.get(concurrentVersion)) - obsolete.add(v.getNodeId()); - concurrents.removeAll(concurrentVersion); + mostCurrentVersionsMap.remove(currentGoodversion); concurrentToAll = false; - concurrents.put(curr.getVersion(), curr); + mostCurrentVersionsMap.put(curr.getVersion(), curr); + if(logger.isDebugEnabled()) { + logger.debug("Updating the current best - adding : " + curr); + } } } // if the value is concurrent to all existing versions then add it // to the concurrent set - if(concurrentToAll) - concurrents.put(curr.getVersion(), curr); + if(concurrentToAll) { + mostCurrentVersionsMap.put(curr.getVersion(), curr); + if(logger.isDebugEnabled()) { + logger.debug("Value is concurrent to all ! : " + curr); + } + } } - // Construct the list of repairs + // 3. Compare 1 and 2 and create the repair list List> repairs = new ArrayList>(3); - for(Integer id: obsolete) { - // repair all obsolete nodes - for(Version v: concurrents.keySet()) { - NodeValue concurrent = concurrents.get(v).iterator().next(); - NodeValue repair = new NodeValue(id, - concurrent.getKey(), - concurrent.getVersioned()); - repairs.add(repair); + for(int nodeId: nodeVersionsMap.keySet()) { + Set finalVersions = new HashSet(mostCurrentVersionsMap.keySet()); + if(logger.isDebugEnabled()) { + logger.debug("Set of final versions = " + finalVersions); } - } - if(concurrents.size() > 1) { - // if there are more then one concurrent versions on different - // nodes, - // we should repair so all have the same set of values - Set> existing = new HashSet>(repairs); - for(NodeValue entry1: concurrents.values()) { - for(NodeValue entry2: concurrents.values()) { - if(!entry1.getVersion().equals(entry2.getVersion())) { - NodeValue repair = new NodeValue(entry1.getNodeId(), - entry2.getKey(), - entry2.getVersioned()); - if(!existing.contains(repair)) - repairs.add(repair); - } + // Calculate the set difference between final Versions and + // the versions currently existing for nodeId + Set currentNodeVersions = new HashSet(); + for(NodeValue nodeValue: nodeVersionsMap.get(nodeId)) { + currentNodeVersions.add(nodeValue.getVersion()); + } + finalVersions.removeAll(currentNodeVersions); + + if(logger.isDebugEnabled()) { + logger.debug("Remaining versions to be repaired for this node after the set difference = " + + finalVersions); + } + + // Repair nodeId with the remaining Versioned values + for(Version remainingVersion: finalVersions) { + NodeValue repair = new NodeValue(nodeId, + mostCurrentVersionsMap.get(remainingVersion) + .getKey(), + mostCurrentVersionsMap.get(remainingVersion) + .getVersioned()); + if(logger.isDebugEnabled()) { + logger.debug("Node value marked to be repaired : " + repair); } + repairs.add(repair); } } return repairs; } - } diff --git a/src/java/voldemort/store/routed/action/AbstractReadRepair.java b/src/java/voldemort/store/routed/action/AbstractReadRepair.java index ac2dac7829..f41d0a00f5 100644 --- a/src/java/voldemort/store/routed/action/AbstractReadRepair.java +++ b/src/java/voldemort/store/routed/action/AbstractReadRepair.java @@ -78,7 +78,7 @@ public void execute(Pipeline pipeline) { long startTimeNs = -1; - if(logger.isTraceEnabled()) + if(logger.isDebugEnabled()) startTimeNs = System.nanoTime(); if(nodeValues.size() > 1 && preferred > 1) { diff --git a/test/unit/voldemort/store/routed/ReadRepairerTest.java b/test/unit/voldemort/store/routed/ReadRepairerTest.java index 343d277a83..b1f48ce155 100644 --- a/test/unit/voldemort/store/routed/ReadRepairerTest.java +++ b/test/unit/voldemort/store/routed/ReadRepairerTest.java @@ -292,6 +292,16 @@ public void testLotsOfVersions() throws Exception { getValue(6, 1, new int[] { 3, 3 }))); } + @Test + public void testConcurrentVersionsDoNotResultInRepairs() throws Exception { + List> emptyExpectedList = new ArrayList>(); + assertVariationsEqual(emptyExpectedList, + asList(getValue(1, 1, new int[] { 1, 1, 2, 2, 2, 2, 3, 3 }), + getValue(1, 1, new int[] { 1, 1, 1, 1, 2, 2, 3, 3 }), + getValue(2, 1, new int[] { 1, 1, 2, 2, 2, 2, 3, 3 }), + getValue(2, 1, new int[] { 1, 1, 1, 1, 2, 2, 3, 3 }))); + } + /** * Test the equality with a few variations on ordering *