Skip to content

Commit

Permalink
Fixing the faulty read-repair logic (removed duplicates, removed unne…
Browse files Browse the repository at this point in the history
…cessary repairs). Added a test case to verify this in the presence of concurrent versions
  • Loading branch information
Chinmay Soman committed May 1, 2013
1 parent 18e3346 commit be4dbc0
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 49 deletions.
121 changes: 73 additions & 48 deletions src/java/voldemort/store/routed/ReadRepairer.java
Expand Up @@ -18,11 +18,14 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.log4j.Logger;

import voldemort.annotations.concurrency.Threadsafe;
import voldemort.versioning.Occurred;
import voldemort.versioning.Version;
Expand All @@ -45,6 +48,8 @@
@Threadsafe
public class ReadRepairer<K, V> {

private final Logger logger = Logger.getLogger(getClass());

/**
* Compute the repair set from the given values and nodes
*
Expand Down Expand Up @@ -77,85 +82,105 @@ private List<NodeValue<K, V>> singleKeyGetRepairs(List<NodeValue<K, V>> nodeValu
if(size <= 1)
return Collections.emptyList();

// A list of obsolete nodes that need to be repaired
Set<Integer> obsolete = new HashSet<Integer>(3);
// 1. Create a multi-map of nodes to their existing Versions
Multimap<Integer, NodeValue<K, V>> nodeVersionsMap = HashMultimap.create();
for(NodeValue<K, V> nodeValue: nodeValues) {
nodeVersionsMap.put(nodeValue.getNodeId(), nodeValue);
}

// 2. Create a map of the final set of versions (for this key)
Map<Version, NodeValue<K, V>> mostCurrentVersionsMap = new HashMap<Version, NodeValue<K, V>>();

// A Map of Version=>NodeValues that contains the current best estimate
// of the set of current versions
// and the nodes containing them
Multimap<Version, NodeValue<K, V>> concurrents = HashMultimap.create();
concurrents.put(nodeValues.get(0).getVersion(), nodeValues.get(0));
// Initialize with the first element from the input
mostCurrentVersionsMap.put(nodeValues.get(0).getVersion(), nodeValues.get(0));

// check each value against the current set of most current versions
for(int i = 1; i < nodeValues.size(); i++) {
NodeValue<K, V> curr = nodeValues.get(i);
boolean concurrentToAll = true;
Set<Version> versions = new HashSet<Version>(concurrents.keySet());
for(Version concurrentVersion: versions) {

// if we already have the version, just add the nodevalue for
// future updating and move on
if(curr.getVersion().equals(concurrentVersion)) {
concurrents.put(curr.getVersion(), curr);
/*
* Make a copy for the traversal. This is because the original map
* can be modified during this traversal
*/
Set<Version> knownGoodVersions = new HashSet<Version>(mostCurrentVersionsMap.keySet());

for(Version currentGoodversion: knownGoodVersions) {

// If the version already exists, do nothing
if(curr.getVersion().equals(currentGoodversion)) {
concurrentToAll = false;
if(logger.isDebugEnabled()) {
logger.debug("Version already exists in the most current set: " + curr);
}
break;
}

// Check the ordering of the current value
Occurred occurred = curr.getVersion().compare(concurrentVersion);
Occurred occurred = curr.getVersion().compare(currentGoodversion);
if(occurred == Occurred.BEFORE) {
// This value is obsolete! Stop checking against other
// values...
obsolete.add(curr.getNodeId());
// This value is obsolete! Break from the loop
if(logger.isDebugEnabled()) {
logger.debug("Version is obsolete : " + curr);
}
concurrentToAll = false;
break;
} else if(occurred == Occurred.AFTER) {
// This concurrent value is obsolete and the current value
// should replace it
for(NodeValue<K, V> v: concurrents.get(concurrentVersion))
obsolete.add(v.getNodeId());
concurrents.removeAll(concurrentVersion);
mostCurrentVersionsMap.remove(currentGoodversion);
concurrentToAll = false;
concurrents.put(curr.getVersion(), curr);
mostCurrentVersionsMap.put(curr.getVersion(), curr);
if(logger.isDebugEnabled()) {
logger.debug("Updating the current best - adding : " + curr);
}
}
}
// if the value is concurrent to all existing versions then add it
// to the concurrent set
if(concurrentToAll)
concurrents.put(curr.getVersion(), curr);
if(concurrentToAll) {
mostCurrentVersionsMap.put(curr.getVersion(), curr);
if(logger.isDebugEnabled()) {
logger.debug("Value is concurrent to all ! : " + curr);
}
}
}

// Construct the list of repairs
// 3. Compare 1 and 2 and create the repair list
List<NodeValue<K, V>> repairs = new ArrayList<NodeValue<K, V>>(3);
for(Integer id: obsolete) {
// repair all obsolete nodes
for(Version v: concurrents.keySet()) {
NodeValue<K, V> concurrent = concurrents.get(v).iterator().next();
NodeValue<K, V> repair = new NodeValue<K, V>(id,
concurrent.getKey(),
concurrent.getVersioned());
repairs.add(repair);
for(int nodeId: nodeVersionsMap.keySet()) {
Set<Version> finalVersions = new HashSet<Version>(mostCurrentVersionsMap.keySet());
if(logger.isDebugEnabled()) {
logger.debug("Set of final versions = " + finalVersions);
}
}

if(concurrents.size() > 1) {
// if there are more then one concurrent versions on different
// nodes,
// we should repair so all have the same set of values
Set<NodeValue<K, V>> existing = new HashSet<NodeValue<K, V>>(repairs);
for(NodeValue<K, V> entry1: concurrents.values()) {
for(NodeValue<K, V> entry2: concurrents.values()) {
if(!entry1.getVersion().equals(entry2.getVersion())) {
NodeValue<K, V> repair = new NodeValue<K, V>(entry1.getNodeId(),
entry2.getKey(),
entry2.getVersioned());
if(!existing.contains(repair))
repairs.add(repair);
}
// Calculate the set difference between final Versions and
// the versions currently existing for nodeId
Set<Version> currentNodeVersions = new HashSet<Version>();
for(NodeValue<K, V> nodeValue: nodeVersionsMap.get(nodeId)) {
currentNodeVersions.add(nodeValue.getVersion());
}
finalVersions.removeAll(currentNodeVersions);

if(logger.isDebugEnabled()) {
logger.debug("Remaining versions to be repaired for this node after the set difference = "
+ finalVersions);
}

// Repair nodeId with the remaining Versioned values
for(Version remainingVersion: finalVersions) {
NodeValue<K, V> repair = new NodeValue<K, V>(nodeId,
mostCurrentVersionsMap.get(remainingVersion)
.getKey(),
mostCurrentVersionsMap.get(remainingVersion)
.getVersioned());
if(logger.isDebugEnabled()) {
logger.debug("Node value marked to be repaired : " + repair);
}
repairs.add(repair);
}
}

return repairs;
}

}
Expand Up @@ -78,7 +78,7 @@ public void execute(Pipeline pipeline) {

long startTimeNs = -1;

if(logger.isTraceEnabled())
if(logger.isDebugEnabled())
startTimeNs = System.nanoTime();

if(nodeValues.size() > 1 && preferred > 1) {
Expand Down
10 changes: 10 additions & 0 deletions test/unit/voldemort/store/routed/ReadRepairerTest.java
Expand Up @@ -292,6 +292,16 @@ public void testLotsOfVersions() throws Exception {
getValue(6, 1, new int[] { 3, 3 })));
}

@Test
public void testConcurrentVersionsDoNotResultInRepairs() throws Exception {
List<NodeValue<String, Integer>> emptyExpectedList = new ArrayList<NodeValue<String, Integer>>();
assertVariationsEqual(emptyExpectedList,
asList(getValue(1, 1, new int[] { 1, 1, 2, 2, 2, 2, 3, 3 }),
getValue(1, 1, new int[] { 1, 1, 1, 1, 2, 2, 3, 3 }),
getValue(2, 1, new int[] { 1, 1, 2, 2, 2, 2, 3, 3 }),
getValue(2, 1, new int[] { 1, 1, 1, 1, 2, 2, 3, 3 })));
}

/**
* Test the equality with a few variations on ordering
*
Expand Down

0 comments on commit be4dbc0

Please sign in to comment.