Skip to content

Commit

Permalink
Fixed hashmap issues in AdminClient raised during code review. Added …
Browse files Browse the repository at this point in the history
…'--parse-only' option to ConsistencyFix.

src/java/voldemort/client/protocol/admin/AdminClient.java
- Added hashCode & equals methods to AdminClient.Nodestore
- cleaned up getSocketStore to not leak concurrently created socket stores.

src/java/voldemort/utils/ConsistencyFix(CLI).java
- added parse only flag which limits that actions of the fixer to bootstrapping and parsing the input file.
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 213003a commit 0c6893c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 24 deletions.
49 changes: 34 additions & 15 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1902,15 +1902,28 @@ private class AdminStoreClient {

private class NodeStore {

@SuppressWarnings("unused")
final public int nodeId;
@SuppressWarnings("unused")
final public Integer nodeId;
final public String storeName;

NodeStore(int nodeId, String storeName) {
this.nodeId = nodeId;
this.nodeId = new Integer(nodeId);
this.storeName = storeName;
}

@Override
public boolean equals(Object obj) {
if(this == obj)
return true;
if(!(obj instanceof NodeStore))
return false;
NodeStore other = (NodeStore) obj;
return nodeId.equals(other.nodeId) && storeName.equals(other.storeName);
}

@Override
public int hashCode() {
return nodeId.hashCode() + storeName.hashCode();
}
}

final private ClientConfig clientConfig;
Expand All @@ -1933,27 +1946,33 @@ private class NodeStore {
public SocketStore getSocketStore(int nodeId, String storeName) {
NodeStore nodeStore = new NodeStore(nodeId, storeName);

if(!nodeStoreSocketCache.containsKey(nodeStore)) {
SocketStore socketStore = null;

SocketStore socketStore = nodeStoreSocketCache.get(nodeStore);
if(socketStore == null) {
Node node = getAdminClientCluster().getNodeById(nodeId);

SocketStore newSocketStore = null;
try {
// Can clientConfig.getRequestFormatType() default to
// TODO: Can clientConfig.getRequestFormatType() default to
// something?
socketStore = clientPool.create(storeName,
node.getHost(),
node.getSocketPort(),
clientConfig.getRequestFormatType(),
RequestRoutingType.IGNORE_CHECKS);
newSocketStore = clientPool.create(storeName,
node.getHost(),
node.getSocketPort(),
clientConfig.getRequestFormatType(),
RequestRoutingType.IGNORE_CHECKS);
} catch(Exception e) {
clientPool.close();
throw new VoldemortException(e);
}

nodeStoreSocketCache.putIfAbsent(nodeStore, socketStore);
socketStore = nodeStoreSocketCache.putIfAbsent(nodeStore, newSocketStore);
if(socketStore == null) {
socketStore = newSocketStore;
} else {
newSocketStore.close();
}
}

return nodeStoreSocketCache.get(nodeStore);
return socketStore;
}

public void stop() {
Expand Down
27 changes: 19 additions & 8 deletions src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -60,12 +60,14 @@ public class ConsistencyFix {
private final long perServerIOPSLimit;
private final ConcurrentMap<Integer, EventThrottler> putThrottlers;
private final boolean dryRun;
private final boolean parseOnly;

ConsistencyFix(String url,
String storeName,
long progressBar,
long perServerIOPSLimit,
boolean dryRun) {
boolean dryRun,
boolean parseOnly) {
this.storeName = storeName;
logger.info("Connecting to bootstrap server: " + url);
this.adminClient = new AdminClient(url, new AdminClientConfig(), 0);
Expand All @@ -85,6 +87,7 @@ public class ConsistencyFix {
this.perServerIOPSLimit = perServerIOPSLimit;
this.putThrottlers = new ConcurrentHashMap<Integer, EventThrottler>();
this.dryRun = dryRun;
this.parseOnly = parseOnly;
}

public String getStoreName() {
Expand All @@ -107,6 +110,10 @@ public boolean isDryRun() {
return dryRun;
}

public boolean isParseOnly() {
return parseOnly;
}

/**
* Throttle put (repair) activity per server.
*
Expand Down Expand Up @@ -295,9 +302,11 @@ public void run() {
counter++;
logger.debug("BadKeyReader read line: key (" + key + ") and counter ("
+ counter + ")");
consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
consistencyFix,
badKeyQOut));
if(!consistencyFix.isParseOnly()) {
consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
consistencyFix,
badKeyQOut));
}
}
}
} catch(IOException ioe) {
Expand Down Expand Up @@ -411,10 +420,12 @@ public void run() {
values.add(new Versioned<byte[]>(value, vectorClock));
}
QueryKeyResult queryKeyResult = new QueryKeyResult(keyByteArray, values);
consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
consistencyFix,
badKeyQOut,
queryKeyResult));
if(!consistencyFix.isParseOnly()) {
consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
consistencyFix,
badKeyQOut,
queryKeyResult));
}
}
}
} catch(Exception e) {
Expand Down
10 changes: 9 additions & 1 deletion src/java/voldemort/utils/ConsistencyFixCLI.java
Expand Up @@ -70,6 +70,7 @@ private static class Options {
public long progressBar = defaultProgressBar;
public long perServerIOPSLimit = defaultPerServerIOPSLimit;
public boolean dryRun = false;
public boolean parseOnly = false;
}

/**
Expand Down Expand Up @@ -100,6 +101,9 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
"Indicates format of bad-key-file-in is of 'orphan' key-values.");
parser.accepts("dry-run",
"Indicates to go through all of the read actions until the point of issuing repair puts. Then, do a 'no-op'.");
parser.accepts("parse-only",
"Indicates to only parse the input file. Does not perform any key queries or repair puts. "
+ "Does bootstrap though so bootstrapUrl and storeName must be specified.");
parser.accepts("bad-key-file-out",
"Name of bad-key-file-out. "
+ "Keys that are not mae consistent are output to this file.")
Expand Down Expand Up @@ -167,6 +171,9 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
if(optionSet.has("dry-run")) {
options.dryRun = true;
}
if(optionSet.has("parse-only")) {
options.parseOnly = true;
}

return options;
}
Expand All @@ -178,7 +185,8 @@ public static void main(String[] args) throws Exception {
options.storeName,
options.progressBar,
options.perServerIOPSLimit,
options.dryRun);
options.dryRun,
options.parseOnly);

String summary = consistencyFix.execute(options.parallelism,
options.badKeyFileIn,
Expand Down

0 comments on commit 0c6893c

Please sign in to comment.