Skip to content

Commit

Permalink
Added basic code for repairing orphaned key,values.
Browse files Browse the repository at this point in the history
src/java/voldemort/utils/ConsistencyFix.java
- added BadKeyOrphanReader extends BadKeyReader to consume different
  input file

src/java/voldemort/utils/ConsistencyFixCLI.java

- added "orphan-format" flag to indicate that the 'bad-key-file-in' is
  of orphaned key/values.

src/java/voldemort/utils/ConsistencyFixWorker.java
- added constructor to take QueryKeyResult of orphaned keys
- modified resolveReadConflicts to add orphaned key/values to
  imaginary nodes for the sake of determine the value/version to be
  repaired
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 8cb8efa commit f573b53
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 16 deletions.
137 changes: 126 additions & 11 deletions src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -22,6 +22,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -38,8 +39,11 @@

import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.admin.QueryKeyResult;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.versioning.ClockEntry;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

public class ConsistencyFix {
Expand Down Expand Up @@ -123,7 +127,10 @@ public String toString() {
}
}

public String execute(int parallelism, String badKeyFileIn, String badKeyFileOut) {
public String execute(int parallelism,
String badKeyFileIn,
boolean orphanFormat,
String badKeyFileOut) {
ExecutorService badKeyReaderService;
ExecutorService badKeyWriterService;
ExecutorService consistencyFixWorkers;
Expand Down Expand Up @@ -226,14 +233,14 @@ public Status getResult() {

public class BadKeyReader implements Runnable {

private final CountDownLatch latch;
private final String badKeyFileIn;
protected final CountDownLatch latch;
protected final String badKeyFileIn;

private final ConsistencyFix consistencyFix;
private final ExecutorService consistencyFixWorkers;
private final BlockingQueue<BadKeyResult> badKeyQOut;
protected final ConsistencyFix consistencyFix;
protected final ExecutorService consistencyFixWorkers;
protected final BlockingQueue<BadKeyResult> badKeyQOut;

private BufferedReader fileReader;
protected BufferedReader fileReader;

BadKeyReader(CountDownLatch latch,
String badKeyFileIn,
Expand All @@ -258,12 +265,12 @@ public class BadKeyReader implements Runnable {
public void run() {
try {
int counter = 0;
for(String line = fileReader.readLine(); line != null; line = fileReader.readLine()) {
if(!line.isEmpty()) {
for(String key = fileReader.readLine(); key != null; key = fileReader.readLine()) {
if(!key.isEmpty()) {
counter++;
logger.debug("BadKeyReader read line: key (" + line + ") and counter ("
logger.debug("BadKeyReader read line: key (" + key + ") and counter ("
+ counter + ")");
consistencyFixWorkers.submit(new ConsistencyFixWorker(line,
consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
consistencyFix,
badKeyQOut));
}
Expand All @@ -282,6 +289,114 @@ public void run() {
}
}

public class BadKeyOrphanReader extends BadKeyReader {

BadKeyOrphanReader(CountDownLatch latch,
String badKeyFileIn,
ConsistencyFix consistencyFix,
ExecutorService consistencyFixWorkers,
BlockingQueue<BadKeyResult> badKeyQOut) {
super(latch, badKeyFileIn, consistencyFix, consistencyFixWorkers, badKeyQOut);
}

// TODO: if we ever do an orphan fix again, we should
// serialize/deserialize VectorClock to/from bytes. Indeed, any object
// that can be persisted and offers a toString, should probably offer
// some to/from options for serde.
/**
* Parses a "version" string of the following format:
*
*
* 'version(2:25, 25:2, 29:156) ts:1355451322089'
*
* and converts this parsed value back into a VectorClock type.
*
* @param versionString
* @return
* @throws IOException
*/
private VectorClock parseVersion(String versionString) throws IOException {
List<ClockEntry> versions = new ArrayList<ClockEntry>();
long timestamp = 0;

// TODO: confirm regex works...
String parsed[] = versionString.split(") ts:");
if(parsed.length != 2) {
throw new IOException("Could not parse vector clock: " + versionString);
}
timestamp = Long.parseLong(parsed[1]);
// "version("
// 01234567
// => 8 is the magic offset to elide "version("
String clockEntryList = parsed[0].substring(8);
String parsedClockEntryList[] = clockEntryList.split(", ");
for(int i = 0; i < parsedClockEntryList.length; ++i) {
String parsedClockEntry[] = parsedClockEntryList[i].split(":");
if(parsedClockEntry.length != 2) {
throw new IOException("Could not parse ClockEntry: " + parsedClockEntryList[i]);
}
short nodeId = Short.parseShort(parsedClockEntry[0]);
long version = Long.parseLong(parsedClockEntry[1]);
versions.add(new ClockEntry(nodeId, version));
}

return new VectorClock(versions, timestamp);
}

@Override
public void run() {
try {
int counter = 0;
for(String keyNumVals = fileReader.readLine(); keyNumVals != null; keyNumVals = fileReader.readLine()) {
if(!keyNumVals.isEmpty()) {
counter++;
String parsed[] = keyNumVals.split(",");
if(parsed.length != 2) {
throw new IOException("KeyNumVal line did not parse into two elements: "
+ keyNumVals);
}
String key = parsed[0];
ByteArray keyByteArray = new ByteArray(ByteUtils.fromHexString(key));
int numVals = Integer.parseInt(parsed[1]);
logger.debug("BadKeyReader read line: key (" + key + ") and counter ("
+ counter + ") and numVals is (" + numVals + ")");

List<Versioned<byte[]>> values = new ArrayList<Versioned<byte[]>>();
for(int i = 0; i < numVals; ++i) {
String valueVersion = fileReader.readLine();
if(valueVersion.isEmpty()) {
throw new IOException("ValueVersion line was empty!");
}
parsed = valueVersion.split(",", 2);
if(parsed.length != 2) {
throw new IOException("ValueVersion line did not parse into two elements: "
+ valueVersion);
}
byte[] value = ByteUtils.fromHexString(parsed[0]);
VectorClock vectorClock = parseVersion(parsed[1]);

values.add(new Versioned<byte[]>(value, vectorClock));
}
QueryKeyResult queryKeyResult = new QueryKeyResult(keyByteArray, values);
consistencyFixWorkers.submit(new ConsistencyFixWorker(keyNumVals,
consistencyFix,
badKeyQOut,
queryKeyResult));
}
}
} catch(Exception e) {
logger.warn("Exception reading badKeyFile " + badKeyFileIn + " : " + e.getMessage());
} finally {
latch.countDown();
try {
fileReader.close();
} catch(IOException ioe) {
logger.warn("IOException during fileReader.close in BadKeyReader thread.");
}
}
}
}

public class BadKeyWriter implements Runnable {

private final String badKeyFileOut;
Expand Down
9 changes: 8 additions & 1 deletion src/java/voldemort/utils/ConsistencyFixCLI.java
Expand Up @@ -44,6 +44,7 @@ private static class Options {
public String url = null;
public String storeName = null;
public String badKeyFileIn = null;
public boolean badKeyFileInOrphanFormat = false;
public String badKeyFileOut = null;
public int parallelism = defaultParallelism;
public long progressBar = defaultProgressBar;
Expand Down Expand Up @@ -73,6 +74,8 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
.describedAs("Name of bad-key-file-in. " + "Each key must be in hexadecimal format. "
+ "Each key must be on a separate line in the file. ")
.ofType(String.class);
parser.accepts("orphan-format",
"Indicates format of bad-key-file-in is of 'orphan' key-values.");
parser.accepts("bad-key-file-out")
.withRequiredArg()
.describedAs("Name of bad-key-file-out. "
Expand Down Expand Up @@ -122,8 +125,11 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {

options.url = (String) optionSet.valueOf("url");
options.storeName = (String) optionSet.valueOf("store");
options.badKeyFileOut = (String) optionSet.valueOf("bad-key-file-out");
options.badKeyFileIn = (String) optionSet.valueOf("bad-key-file-in");
options.badKeyFileOut = (String) optionSet.valueOf("bad-key-file-out");
if(optionSet.has("orphan-format")) {
options.badKeyFileInOrphanFormat = true;
}
if(optionSet.has("parallelism")) {
options.parallelism = (Integer) optionSet.valueOf("parallelism");
}
Expand All @@ -147,6 +153,7 @@ public static void main(String[] args) throws Exception {

String summary = consistencyFix.execute(options.parallelism,
options.badKeyFileIn,
options.badKeyFileInOrphanFormat,
options.badKeyFileOut);

System.out.println(summary);
Expand Down
52 changes: 48 additions & 4 deletions src/java/voldemort/utils/ConsistencyFixWorker.java
Expand Up @@ -39,17 +39,44 @@
class ConsistencyFixWorker implements Runnable {

private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class);
private static final int fakeNodeID = Integer.MIN_VALUE;

private final String keyInHexFormat;
private final ConsistencyFix consistencyFix;
private final BlockingQueue<BadKeyResult> badKeyQOut;
private final QueryKeyResult orphanedValues;

/**
* Normal use case constructor.
*
* @param keyInHexFormat
* @param consistencyFix
* @param badKeyQOut
*/
ConsistencyFixWorker(String keyInHexFormat,
ConsistencyFix consistencyFix,
BlockingQueue<BadKeyResult> badKeyQOut) {
this(keyInHexFormat, consistencyFix, badKeyQOut, null);
}

/**
* Constructor for "orphaned values" use case. I.e., there are values for
* the specific key that exist somewhere and may need to be written to the
* nodes which actually host the key.
*
* @param keyInHexFormat
* @param consistencyFix
* @param badKeyQOut
* @param orphanedValues Set to null if no orphaned values to be included.
*/
ConsistencyFixWorker(String keyInHexFormat,
ConsistencyFix consistencyFix,
BlockingQueue<BadKeyResult> badKeyQOut,
QueryKeyResult orphanedValues) {
this.keyInHexFormat = keyInHexFormat;
this.consistencyFix = consistencyFix;
this.badKeyQOut = badKeyQOut;
this.orphanedValues = orphanedValues;
}

private String myName() {
Expand Down Expand Up @@ -233,15 +260,32 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
*/
private List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(final List<NodeValue<ByteArray, byte[]>> nodeValues) {

// Some cut-paste-and-modify coding from
// If orphaned values exist, add them to fake nodes to be processed by
// "getRepairs"
int currentFakeNodeId = fakeNodeID;
if(this.orphanedValues != null) {
for(Versioned<byte[]> value: this.orphanedValues.getValues()) {
nodeValues.add(new NodeValue<ByteArray, byte[]>(currentFakeNodeId,
this.orphanedValues.getKey(),
value));
currentFakeNodeId++;
}
}

// Some cut-paste-and-modify (CPAM) coding from
// store/routed/action/AbstractReadRepair.java and
// store/routed/ThreadPoolRoutedStore.java
ReadRepairer<ByteArray, byte[]> readRepairer = new ReadRepairer<ByteArray, byte[]>();
List<NodeValue<ByteArray, byte[]>> toReadRepair = Lists.newArrayList();
for(NodeValue<ByteArray, byte[]> v: readRepairer.getRepairs(nodeValues)) {
Versioned<byte[]> versioned = Versioned.value(v.getVersioned().getValue(),
((VectorClock) v.getVersion()).clone());
toReadRepair.add(new NodeValue<ByteArray, byte[]>(v.getNodeId(), v.getKey(), versioned));
if(v.getNodeId() > currentFakeNodeId) {
// Only copy repairs intended for real nodes.
Versioned<byte[]> versioned = Versioned.value(v.getVersioned().getValue(),
((VectorClock) v.getVersion()).clone());
toReadRepair.add(new NodeValue<ByteArray, byte[]>(v.getNodeId(),
v.getKey(),
versioned));
}
}

if(logger.isDebugEnabled()) {
Expand Down

0 comments on commit f573b53

Please sign in to comment.