Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added unit tests for ConsistencyFix, ConsistencyFixWorker, and QueryK…
…eyResult.

Many other fixes and cleanup:

src/java/voldemort/utils/ConsistencyFix.java
- tweak many variable names
- add close method to stop adminClient
- broke out BadKey to wrap a key with its string representation st failed fixes of badkey's can be dumped in full to file to be retried (without any additional effort)
- marked 'parseVersion' as deprecated since, if we do this again, we should dump bytes not strings
- track obsolete version exceptions and various statuses in Stats

src/java/voldemort/utils/ConsistencyFixCLI.java
- clean up of arguments, variable names, etc.
- cleanly close down fixer...

src/java/voldemort/utils/ConsistencyFixWorker.java
- more logger.trace output
- minor cleanup

test/common/voldemort/TestUtils.java
- added getVersioned() helper method

test/common/voldemort/config/stores.xml
- added consistency-fix store

test/unit/voldemort/store/routed/ReadRepairerTest.java
- marked all tests as @test

test/unit/voldemort/utils/ConsistencyCheckTest.java
- update copyright notice
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent b0a000d commit 78e9417
Show file tree
Hide file tree
Showing 11 changed files with 887 additions and 128 deletions.
257 changes: 178 additions & 79 deletions src/java/voldemort/utils/ConsistencyFix.java

Large diffs are not rendered by default.

39 changes: 21 additions & 18 deletions src/java/voldemort/utils/ConsistencyFixCLI.java
Expand Up @@ -34,7 +34,7 @@ public static void printUsage() {
sb.append("Optional arguments: \n");
sb.append("\t--orphan-format\n");
sb.append("\t--dry-run\n");
sb.append("\t--progress-bar <progressBarPeriod>\n");
sb.append("\t--progress-period-ops <progressPeriodOps>\n");
sb.append("\t--parallelism <parallelism>\n");
sb.append("\t--per-server-iops-limit <perServerIOPSLimit>\n");
sb.append("\n");
Expand All @@ -57,18 +57,17 @@ public static void printUsage(String errMessage, OptionParser parser) {
private static class Options {

public final static int defaultParallelism = 8;
// TODO: change name to progressPeriodMs (or S, as case may be)
public final static long defaultProgressBar = 1000;
public final static long defaultPerServerIOPSLimit = 100;
public final static long defaultProgressPeriodOps = 1000;
public final static long defaultPerServerQPSLimit = 100;

public String url = null;
public String storeName = null;
public String badKeyFileIn = null;
public boolean badKeyFileInOrphanFormat = false;
public String badKeyFileOut = null;
public int parallelism = defaultParallelism;
public long progressBar = defaultProgressBar;
public long perServerIOPSLimit = defaultPerServerIOPSLimit;
public long progressPeriodOps = defaultProgressPeriodOps;
public long perServerQPSLimit = defaultPerServerQPSLimit;
public boolean dryRun = false;
public boolean parseOnly = false;
}
Expand Down Expand Up @@ -115,15 +114,17 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
.withRequiredArg()
.describedAs("parallelism [Default value: " + Options.defaultParallelism + "]")
.ofType(Integer.class);
parser.accepts("progress-bar", "Number of operations between 'info' progress messages. ")
parser.accepts("progress-period-ops",
"Number of operations between 'info' progress messages. ")
.withRequiredArg()
.describedAs("progressBar [Default value: " + Options.defaultProgressBar + "]")
.describedAs("period (in operations) between outputting progress [Default value: "
+ Options.defaultProgressPeriodOps + "]")
.ofType(Long.class);
parser.accepts("per-server-iops-limit",
"Number of operations that the consistency fixer will issue into any individual server in one second. ")
parser.accepts("per-server-qps-limit",
"Number of operations that the consistency fixer will issue to any individual server in one second. ")
.withRequiredArg()
.describedAs("perServerIOPSLimit [Default value: "
+ Options.defaultPerServerIOPSLimit + "]")
.describedAs("perServerQPSLimit [Default value: " + Options.defaultPerServerQPSLimit
+ "]")
.ofType(Long.class);

OptionSet optionSet = parser.parse(args);
Expand Down Expand Up @@ -162,11 +163,11 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
if(optionSet.has("parallelism")) {
options.parallelism = (Integer) optionSet.valueOf("parallelism");
}
if(optionSet.has("progress-bar")) {
options.progressBar = (Long) optionSet.valueOf("progress-bar");
if(optionSet.has("progress-period-ops")) {
options.progressPeriodOps = (Long) optionSet.valueOf("progress-period-ops");
}
if(optionSet.has("per-server-iops-limit")) {
options.perServerIOPSLimit = (Long) optionSet.valueOf("per-server-iops-limit");
if(optionSet.has("per-server-qps-limit")) {
options.perServerQPSLimit = (Long) optionSet.valueOf("per-server-qps-limit");
}
if(optionSet.has("dry-run")) {
options.dryRun = true;
Expand All @@ -183,8 +184,8 @@ public static void main(String[] args) throws Exception {

ConsistencyFix consistencyFix = new ConsistencyFix(options.url,
options.storeName,
options.progressBar,
options.perServerIOPSLimit,
options.progressPeriodOps,
options.perServerQPSLimit,
options.dryRun,
options.parseOnly);

Expand All @@ -193,6 +194,8 @@ public static void main(String[] args) throws Exception {
options.badKeyFileInOrphanFormat,
options.badKeyFileOut);

consistencyFix.close();

System.out.println(summary);
}
}
84 changes: 56 additions & 28 deletions src/java/voldemort/utils/ConsistencyFixWorker.java
Expand Up @@ -28,7 +28,8 @@
import voldemort.client.protocol.admin.QueryKeyResult;
import voldemort.store.routed.NodeValue;
import voldemort.store.routed.ReadRepairer;
import voldemort.utils.ConsistencyFix.BadKeyResult;
import voldemort.utils.ConsistencyFix.BadKey;
import voldemort.utils.ConsistencyFix.BadKeyStatus;
import voldemort.utils.ConsistencyFix.Status;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
Expand All @@ -41,9 +42,9 @@ class ConsistencyFixWorker implements Runnable {
private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class);
private static final int fakeNodeID = Integer.MIN_VALUE;

private final String keyInHexFormat;
private final BadKey badKey;
private final ConsistencyFix consistencyFix;
private final BlockingQueue<BadKeyResult> badKeyQOut;
private final BlockingQueue<BadKeyStatus> badKeyQOut;
private final QueryKeyResult orphanedValues;

/**
Expand All @@ -53,10 +54,10 @@ class ConsistencyFixWorker implements Runnable {
* @param consistencyFix
* @param badKeyQOut
*/
ConsistencyFixWorker(String keyInHexFormat,
ConsistencyFixWorker(BadKey badKey,
ConsistencyFix consistencyFix,
BlockingQueue<BadKeyResult> badKeyQOut) {
this(keyInHexFormat, consistencyFix, badKeyQOut, null);
BlockingQueue<BadKeyStatus> badKeyQOut) {
this(badKey, consistencyFix, badKeyQOut, null);
}

/**
Expand All @@ -69,11 +70,11 @@ class ConsistencyFixWorker implements Runnable {
* @param badKeyQOut
* @param orphanedValues Set to null if no orphaned values to be included.
*/
ConsistencyFixWorker(String keyInHexFormat,
ConsistencyFixWorker(BadKey badKey,
ConsistencyFix consistencyFix,
BlockingQueue<BadKeyResult> badKeyQOut,
BlockingQueue<BadKeyStatus> badKeyQOut,
QueryKeyResult orphanedValues) {
this.keyInHexFormat = keyInHexFormat;
this.badKey = badKey;
this.consistencyFix = consistencyFix;
this.badKeyQOut = badKeyQOut;
this.orphanedValues = orphanedValues;
Expand All @@ -85,29 +86,28 @@ private String myName() {

@Override
public void run() {
logger.trace("About to process key " + keyInHexFormat + " (" + myName() + ")");
Status status = doConsistencyFix(keyInHexFormat);
logger.trace("Finished processing key " + keyInHexFormat + " (" + myName() + ")");
logger.trace("About to process key " + badKey + " (" + myName() + ")");
Status status = doConsistencyFix(badKey);
logger.trace("Finished processing key " + badKey + " (" + myName() + ")");
consistencyFix.getStats().incrementFixCount();

if(status != Status.SUCCESS) {
try {
badKeyQOut.put(consistencyFix.new BadKeyResult(keyInHexFormat, status));
badKeyQOut.put(new BadKeyStatus(badKey, status));
} catch(InterruptedException ie) {
logger.warn("Worker thread " + myName() + " interrupted.");
}
consistencyFix.getStats().incrementFailures();
consistencyFix.getStats().incrementFailures(status);
}
}

public Status doConsistencyFix(String keyInHexFormat) {

public Status doConsistencyFix(BadKey badKey) {
// Initialization.
byte[] keyInBytes;
List<Integer> nodeIdList = null;
int masterPartitionId = -1;
try {
keyInBytes = ByteUtils.fromHexString(keyInHexFormat);
keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat());
masterPartitionId = consistencyFix.getStoreInstance().getMasterPartitionId(keyInBytes);
nodeIdList = consistencyFix.getStoreInstance()
.getReplicationNodeList(masterPartitionId);
Expand All @@ -123,19 +123,27 @@ public Status doConsistencyFix(String keyInHexFormat) {
// Do the reads
Map<Integer, QueryKeyResult> nodeIdToKeyValues = doReads(nodeIdList,
keyInBytes,
keyInHexFormat);
badKey.getKeyInHexFormat());

// Process read replies (i.e., nodeIdToKeyValues)
ProcessReadRepliesResult result = processReadReplies(nodeIdList,
keyAsByteArray,
keyInHexFormat,
badKey.getKeyInHexFormat(),
nodeIdToKeyValues);
if(result.status != Status.SUCCESS) {
return result.status;
}

// Resolve conflicts indicated in nodeValues
List<NodeValue<ByteArray, byte[]>> toReadRepair = resolveReadConflicts(result.nodeValues);
if(logger.isTraceEnabled()) {
if(toReadRepair.size() == 0) {
logger.trace("Nothing to repair");
}
for(NodeValue<ByteArray, byte[]> nodeValue: toReadRepair) {
logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString());
}
}

// Do the repairs
Status status = doRepairPut(toReadRepair);
Expand Down Expand Up @@ -248,6 +256,7 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
logger.info("Aborting fixKey because exceptions were encountered when fetching key-values.");
return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION);
}

if(logger.isDebugEnabled()) {
for(NodeValue<ByteArray, byte[]> nkv: nodeValues) {
logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get())
Expand All @@ -267,6 +276,17 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
*/
private List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(final List<NodeValue<ByteArray, byte[]>> nodeValues) {

if(logger.isTraceEnabled()) {
logger.trace("NodeValues passed into resolveReadConflicts.");
if(nodeValues.size() == 0) {
logger.trace("Empty nodeValues passed to resolveReadConflicts");
}
for(NodeValue<ByteArray, byte[]> nodeValue: nodeValues) {
logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString()
+ " - " + nodeValue.getVersion().toString());
}
}

// If orphaned values exist, add them to fake nodes to be processed by
// "getRepairs"
int currentFakeNodeId = fakeNodeID;
Expand All @@ -279,21 +299,26 @@ private List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(final List<NodeV
}
}

// Some cut-paste-and-modify (CPAM) coding from
// Some cut-paste-and-modify coding from
// store/routed/action/AbstractReadRepair.java and
// store/routed/ThreadPoolRoutedStore.java
ReadRepairer<ByteArray, byte[]> readRepairer = new ReadRepairer<ByteArray, byte[]>();
if(logger.isDebugEnabled()) {
for(NodeValue<ByteArray, byte[]> nodeKeyValue: readRepairer.getRepairs(nodeValues)) {
logger.debug("\tNodeKeyValue result from readRepairer.getRepairs : "
List<NodeValue<ByteArray, byte[]>> nodeKeyValues = readRepairer.getRepairs(nodeValues);

if(logger.isTraceEnabled()) {
if(nodeKeyValues.size() == 0) {
logger.trace("\treadRepairer returned an empty list.");
}
for(NodeValue<ByteArray, byte[]> nodeKeyValue: nodeKeyValues) {
logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : "
+ ByteUtils.toHexString(nodeKeyValue.getKey().get())
+ " on node with id " + nodeKeyValue.getNodeId() + " for version "
+ nodeKeyValue.getVersion());
}
}

List<NodeValue<ByteArray, byte[]>> toReadRepair = Lists.newArrayList();
for(NodeValue<ByteArray, byte[]> v: readRepairer.getRepairs(nodeValues)) {
for(NodeValue<ByteArray, byte[]> v: nodeKeyValues) {
if(v.getNodeId() > currentFakeNodeId) {
// Only copy repairs intended for real nodes.
Versioned<byte[]> versioned = Versioned.value(v.getVersioned().getValue(),
Expand All @@ -311,9 +336,12 @@ private List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(final List<NodeV

}

if(logger.isDebugEnabled()) {
if(logger.isTraceEnabled()) {
if(toReadRepair.size() == 0) {
logger.trace("\ttoReadRepair is empty.");
}
for(NodeValue<ByteArray, byte[]> nodeKeyValue: toReadRepair) {
logger.debug("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get())
logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get())
+ " on node with id " + nodeKeyValue.getNodeId() + " for version "
+ nodeKeyValue.getVersion());

Expand Down Expand Up @@ -342,8 +370,8 @@ public Status doRepairPut(final List<NodeValue<ByteArray, byte[]>> toReadRepair)
nodeKeyValue);
consistencyFix.getStats().incrementPutCount();
} catch(ObsoleteVersionException ove) {
// TODO: Add OVE catches to some statistics?
// NOOP. Treat OVE as success.
// Treat OVE as success.
consistencyFix.getStats().incrementObsoleteVersionExceptions();
} catch(VoldemortException ve) {
allRepairsSuccessful = false;
logger.debug("Repair of key " + nodeKeyValue.getKey() + "on node with id "
Expand Down
12 changes: 11 additions & 1 deletion test/common/voldemort/TestUtils.java
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2009 LinkedIn, Inc
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -70,6 +70,16 @@ public static VectorClock getClock(int... nodes) {
return clock;
}

/**
* Helper method to construct Versioned byte value.
*
* @param nodes See getClock method for explanation of this argument
* @return
*/
public static Versioned<byte[]> getVersioned(byte[] value, int... nodes) {
return new Versioned<byte[]>(value, getClock(nodes));
}

/**
* Record events for the given sequence of nodes
*
Expand Down
19 changes: 19 additions & 0 deletions test/common/voldemort/config/stores.xml
Expand Up @@ -251,4 +251,23 @@
</value-serializer>
<retention-days>1</retention-days>
</store>
<store>
<name>consistency-fix</name>
<persistence>memory</persistence>
<routing>client</routing>
<replication-factor>4</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>2</preferred-writes>
<required-writes>2</required-writes>
<key-serializer>
<type>string</type>
<schema-info>UTF-8</schema-info>
</key-serializer>
<value-serializer>
<type>string</type>
<schema-info>UTF-8</schema-info>
</value-serializer>
<retention-days>1</retention-days>
</store>
</stores>
1 change: 0 additions & 1 deletion test/unit/voldemort/client/AdminFetchTest.java
Expand Up @@ -13,7 +13,6 @@
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.client;

import static junit.framework.Assert.assertEquals;
Expand Down

0 comments on commit 78e9417

Please sign in to comment.