From f573b53fd7760d7a519e9e1762c123e2f8868fc5 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Sun, 10 Feb 2013 14:40:28 -0800 Subject: [PATCH] Added basic code for repairing orphaned key,values. src/java/voldemort/utils/ConsistencyFix.java - added BadKeyOrphanReader extends BadKeyReader to consume different input file src/java/voldemort/utils/ConsistencyFixCLI.java - added "orphan-format" flag to indicate that the 'bad-key-file-in' is of orphaned key/values. src/java/voldemort/utils/ConsistencyFixWorker.java - added constructor to take QueryKeyResult of orphaned keys - modified resolveReadConflicts to add orphaned key/values to imaginary nodes for the sake of determine the value/version to be repaired --- src/java/voldemort/utils/ConsistencyFix.java | 137 ++++++++++++++++-- .../voldemort/utils/ConsistencyFixCLI.java | 9 +- .../voldemort/utils/ConsistencyFixWorker.java | 52 ++++++- 3 files changed, 182 insertions(+), 16 deletions(-) diff --git a/src/java/voldemort/utils/ConsistencyFix.java b/src/java/voldemort/utils/ConsistencyFix.java index 61d2cf1ce7..0a3bb580a3 100644 --- a/src/java/voldemort/utils/ConsistencyFix.java +++ b/src/java/voldemort/utils/ConsistencyFix.java @@ -22,6 +22,7 @@ import java.io.FileWriter; import java.io.IOException; import java.text.DecimalFormat; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -38,8 +39,11 @@ import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.client.protocol.admin.QueryKeyResult; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; +import voldemort.versioning.ClockEntry; +import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; public class ConsistencyFix { @@ -123,7 +127,10 @@ public String toString() { } } - public String execute(int parallelism, String badKeyFileIn, String badKeyFileOut) { + public String execute(int parallelism, + String badKeyFileIn, + boolean orphanFormat, + String badKeyFileOut) { ExecutorService badKeyReaderService; ExecutorService badKeyWriterService; ExecutorService consistencyFixWorkers; @@ -226,14 +233,14 @@ public Status getResult() { public class BadKeyReader implements Runnable { - private final CountDownLatch latch; - private final String badKeyFileIn; + protected final CountDownLatch latch; + protected final String badKeyFileIn; - private final ConsistencyFix consistencyFix; - private final ExecutorService consistencyFixWorkers; - private final BlockingQueue badKeyQOut; + protected final ConsistencyFix consistencyFix; + protected final ExecutorService consistencyFixWorkers; + protected final BlockingQueue badKeyQOut; - private BufferedReader fileReader; + protected BufferedReader fileReader; BadKeyReader(CountDownLatch latch, String badKeyFileIn, @@ -258,12 +265,12 @@ public class BadKeyReader implements Runnable { public void run() { try { int counter = 0; - for(String line = fileReader.readLine(); line != null; line = fileReader.readLine()) { - if(!line.isEmpty()) { + for(String key = fileReader.readLine(); key != null; key = fileReader.readLine()) { + if(!key.isEmpty()) { counter++; - logger.debug("BadKeyReader read line: key (" + line + ") and counter (" + logger.debug("BadKeyReader read line: key (" + key + ") and counter (" + counter + ")"); - consistencyFixWorkers.submit(new ConsistencyFixWorker(line, + consistencyFixWorkers.submit(new ConsistencyFixWorker(key, consistencyFix, badKeyQOut)); } @@ -282,6 +289,114 @@ public void run() { } } + public class BadKeyOrphanReader extends BadKeyReader { + + BadKeyOrphanReader(CountDownLatch latch, + String badKeyFileIn, + ConsistencyFix consistencyFix, + ExecutorService consistencyFixWorkers, + BlockingQueue badKeyQOut) { + super(latch, badKeyFileIn, consistencyFix, consistencyFixWorkers, badKeyQOut); + } + + // TODO: if we ever do an orphan fix again, we should + // serialize/deserialize VectorClock to/from bytes. Indeed, any object + // that can be persisted and offers a toString, should probably offer + // some to/from options for serde. + /** + * Parses a "version" string of the following format: + * + * + * 'version(2:25, 25:2, 29:156) ts:1355451322089' + * + * and converts this parsed value back into a VectorClock type. + * + * @param versionString + * @return + * @throws IOException + */ + private VectorClock parseVersion(String versionString) throws IOException { + List versions = new ArrayList(); + long timestamp = 0; + + // TODO: confirm regex works... + String parsed[] = versionString.split(") ts:"); + if(parsed.length != 2) { + throw new IOException("Could not parse vector clock: " + versionString); + } + timestamp = Long.parseLong(parsed[1]); + // "version(" + // 01234567 + // => 8 is the magic offset to elide "version(" + String clockEntryList = parsed[0].substring(8); + String parsedClockEntryList[] = clockEntryList.split(", "); + for(int i = 0; i < parsedClockEntryList.length; ++i) { + String parsedClockEntry[] = parsedClockEntryList[i].split(":"); + if(parsedClockEntry.length != 2) { + throw new IOException("Could not parse ClockEntry: " + parsedClockEntryList[i]); + } + short nodeId = Short.parseShort(parsedClockEntry[0]); + long version = Long.parseLong(parsedClockEntry[1]); + versions.add(new ClockEntry(nodeId, version)); + } + + return new VectorClock(versions, timestamp); + } + + @Override + public void run() { + try { + int counter = 0; + for(String keyNumVals = fileReader.readLine(); keyNumVals != null; keyNumVals = fileReader.readLine()) { + if(!keyNumVals.isEmpty()) { + counter++; + String parsed[] = keyNumVals.split(","); + if(parsed.length != 2) { + throw new IOException("KeyNumVal line did not parse into two elements: " + + keyNumVals); + } + String key = parsed[0]; + ByteArray keyByteArray = new ByteArray(ByteUtils.fromHexString(key)); + int numVals = Integer.parseInt(parsed[1]); + logger.debug("BadKeyReader read line: key (" + key + ") and counter (" + + counter + ") and numVals is (" + numVals + ")"); + + List> values = new ArrayList>(); + for(int i = 0; i < numVals; ++i) { + String valueVersion = fileReader.readLine(); + if(valueVersion.isEmpty()) { + throw new IOException("ValueVersion line was empty!"); + } + parsed = valueVersion.split(",", 2); + if(parsed.length != 2) { + throw new IOException("ValueVersion line did not parse into two elements: " + + valueVersion); + } + byte[] value = ByteUtils.fromHexString(parsed[0]); + VectorClock vectorClock = parseVersion(parsed[1]); + + values.add(new Versioned(value, vectorClock)); + } + QueryKeyResult queryKeyResult = new QueryKeyResult(keyByteArray, values); + consistencyFixWorkers.submit(new ConsistencyFixWorker(keyNumVals, + consistencyFix, + badKeyQOut, + queryKeyResult)); + } + } + } catch(Exception e) { + logger.warn("Exception reading badKeyFile " + badKeyFileIn + " : " + e.getMessage()); + } finally { + latch.countDown(); + try { + fileReader.close(); + } catch(IOException ioe) { + logger.warn("IOException during fileReader.close in BadKeyReader thread."); + } + } + } + } + public class BadKeyWriter implements Runnable { private final String badKeyFileOut; diff --git a/src/java/voldemort/utils/ConsistencyFixCLI.java b/src/java/voldemort/utils/ConsistencyFixCLI.java index 2901a18e9e..dacb1185e4 100644 --- a/src/java/voldemort/utils/ConsistencyFixCLI.java +++ b/src/java/voldemort/utils/ConsistencyFixCLI.java @@ -44,6 +44,7 @@ private static class Options { public String url = null; public String storeName = null; public String badKeyFileIn = null; + public boolean badKeyFileInOrphanFormat = false; public String badKeyFileOut = null; public int parallelism = defaultParallelism; public long progressBar = defaultProgressBar; @@ -73,6 +74,8 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { .describedAs("Name of bad-key-file-in. " + "Each key must be in hexadecimal format. " + "Each key must be on a separate line in the file. ") .ofType(String.class); + parser.accepts("orphan-format", + "Indicates format of bad-key-file-in is of 'orphan' key-values."); parser.accepts("bad-key-file-out") .withRequiredArg() .describedAs("Name of bad-key-file-out. " @@ -122,8 +125,11 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { options.url = (String) optionSet.valueOf("url"); options.storeName = (String) optionSet.valueOf("store"); - options.badKeyFileOut = (String) optionSet.valueOf("bad-key-file-out"); options.badKeyFileIn = (String) optionSet.valueOf("bad-key-file-in"); + options.badKeyFileOut = (String) optionSet.valueOf("bad-key-file-out"); + if(optionSet.has("orphan-format")) { + options.badKeyFileInOrphanFormat = true; + } if(optionSet.has("parallelism")) { options.parallelism = (Integer) optionSet.valueOf("parallelism"); } @@ -147,6 +153,7 @@ public static void main(String[] args) throws Exception { String summary = consistencyFix.execute(options.parallelism, options.badKeyFileIn, + options.badKeyFileInOrphanFormat, options.badKeyFileOut); System.out.println(summary); diff --git a/src/java/voldemort/utils/ConsistencyFixWorker.java b/src/java/voldemort/utils/ConsistencyFixWorker.java index 23427746f1..2fcffd2722 100644 --- a/src/java/voldemort/utils/ConsistencyFixWorker.java +++ b/src/java/voldemort/utils/ConsistencyFixWorker.java @@ -39,17 +39,44 @@ class ConsistencyFixWorker implements Runnable { private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class); + private static final int fakeNodeID = Integer.MIN_VALUE; private final String keyInHexFormat; private final ConsistencyFix consistencyFix; private final BlockingQueue badKeyQOut; + private final QueryKeyResult orphanedValues; + /** + * Normal use case constructor. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + */ ConsistencyFixWorker(String keyInHexFormat, ConsistencyFix consistencyFix, BlockingQueue badKeyQOut) { + this(keyInHexFormat, consistencyFix, badKeyQOut, null); + } + + /** + * Constructor for "orphaned values" use case. I.e., there are values for + * the specific key that exist somewhere and may need to be written to the + * nodes which actually host the key. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + * @param orphanedValues Set to null if no orphaned values to be included. + */ + ConsistencyFixWorker(String keyInHexFormat, + ConsistencyFix consistencyFix, + BlockingQueue badKeyQOut, + QueryKeyResult orphanedValues) { this.keyInHexFormat = keyInHexFormat; this.consistencyFix = consistencyFix; this.badKeyQOut = badKeyQOut; + this.orphanedValues = orphanedValues; } private String myName() { @@ -233,15 +260,32 @@ private ProcessReadRepliesResult processReadReplies(final List nodeIdLi */ private List> resolveReadConflicts(final List> nodeValues) { - // Some cut-paste-and-modify coding from + // If orphaned values exist, add them to fake nodes to be processed by + // "getRepairs" + int currentFakeNodeId = fakeNodeID; + if(this.orphanedValues != null) { + for(Versioned value: this.orphanedValues.getValues()) { + nodeValues.add(new NodeValue(currentFakeNodeId, + this.orphanedValues.getKey(), + value)); + currentFakeNodeId++; + } + } + + // Some cut-paste-and-modify (CPAM) coding from // store/routed/action/AbstractReadRepair.java and // store/routed/ThreadPoolRoutedStore.java ReadRepairer readRepairer = new ReadRepairer(); List> toReadRepair = Lists.newArrayList(); for(NodeValue v: readRepairer.getRepairs(nodeValues)) { - Versioned versioned = Versioned.value(v.getVersioned().getValue(), - ((VectorClock) v.getVersion()).clone()); - toReadRepair.add(new NodeValue(v.getNodeId(), v.getKey(), versioned)); + if(v.getNodeId() > currentFakeNodeId) { + // Only copy repairs intended for real nodes. + Versioned versioned = Versioned.value(v.getVersioned().getValue(), + ((VectorClock) v.getVersion()).clone()); + toReadRepair.add(new NodeValue(v.getNodeId(), + v.getKey(), + versioned)); + } } if(logger.isDebugEnabled()) {