Skip to content

Commit

Permalink
Addressed review feedback and TODOs for KeyVersionSamplerCLI (and ren…
Browse files Browse the repository at this point in the history
…amed it to KeyVersionFetcherCLI).

- mostly usability changes about command line options...
- one copyright fix
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent f5e8f5a commit 92b80c0
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/java/voldemort/client/protocol/pb/ProtoUtils.java
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2009 LinkedIn, Inc
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import joptsimple.OptionException;
import joptsimple.OptionParser;
Expand All @@ -46,20 +48,19 @@
import voldemort.store.StoreDefinition;
import voldemort.versioning.Versioned;

// TODO: Rename KeyValueFetcher

/**
* The KeyVersionSamplerCLI is a rudimentary tool that outputs a sampling of
* The KeyVersionFetcherCLI is a rudimentary tool that outputs a sampling of
* existing keys from a cluster. For each store in the cluster, a distinct file
* of keys to sample is expected. And, for each of these, a distint file of
* key-versions is generated.
*
*/
public class KeyVersionSamplerCLI {
public class KeyVersionFetcherCLI {

private static Logger logger = Logger.getLogger(KeyVersionSamplerCLI.class);
private static Logger logger = Logger.getLogger(KeyVersionFetcherCLI.class);

private final static int KEY_PARALLELISM = 4;
private final static int DEFAULT_KEY_PARALLELISM = 4;
private final static int DEFAULT_PROGRESS_PERIOD_OPS = 1000;

private final AdminClient adminClient;
private final Cluster cluster;
Expand All @@ -69,9 +70,18 @@ public class KeyVersionSamplerCLI {
private final String inDir;
private final String outDir;

private final ExecutorService kvSamplerService;
private final ExecutorService kvFetcherService;
private final int progressPeriodOps;

private final long startTimeMs;
private static AtomicInteger fetches = new AtomicInteger(0);

public KeyVersionSamplerCLI(String url, String inDir, String outDir, int keyParallelism) {
public KeyVersionFetcherCLI(String url,
String inDir,
String outDir,
List<String> storeNames,
int keyParallelism,
int progressPeriodOps) {
if(logger.isInfoEnabled()) {
logger.info("Connecting to bootstrap server: " + url);
}
Expand All @@ -80,30 +90,69 @@ public KeyVersionSamplerCLI(String url, String inDir, String outDir, int keyPara
this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0).getValue();
this.storeNameToKeyStringsMap = new HashMap<String, StringBuilder>();
for(StoreDefinition storeDefinition: storeDefinitions) {
this.storeNameToKeyStringsMap.put(storeDefinition.getName(), new StringBuilder());
String storeName = storeDefinition.getName();
if(storeNames != null) {
if(!storeNames.contains(storeName)) {
logger.debug("Will not sample store "
+ storeName
+ " since it is not in list of storeNames provided on command line.");
continue;
}
}
this.storeNameToKeyStringsMap.put(storeName, new StringBuilder());
}

if(storeNames != null) {
List<String> badStoreNames = new LinkedList<String>();
for(String storeName: storeNames) {
if(!this.storeNameToKeyStringsMap.keySet().contains(storeName)) {
badStoreNames.add(storeName);
}
}
if(badStoreNames.size() > 0) {
Utils.croak("Some storeNames provided on the command line were not found on this cluster: "
+ badStoreNames);
}
}

this.inDir = inDir;
this.outDir = outDir;

this.kvSamplerService = Executors.newFixedThreadPool(keyParallelism);
this.kvFetcherService = Executors.newFixedThreadPool(keyParallelism);

this.progressPeriodOps = progressPeriodOps;
this.startTimeMs = System.currentTimeMillis();
}

public boolean sampleStores() {
for(StoreDefinition storeDefinition: storeDefinitions) {
if(!sampleStore(storeDefinition)) {
return false;
if(storeNameToKeyStringsMap.keySet().contains(storeDefinition.getName())) {
if(!sampleStore(storeDefinition)) {
return false;
}
}
}
return true;
}

public class KeyVersionSampler implements Callable<String> {
public void updateFetchProgress() {
int curFetches = fetches.incrementAndGet();

if(0 == curFetches % progressPeriodOps) {
if(logger.isInfoEnabled()) {
long durationS = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()
- startTimeMs);
logger.info("Fetched " + curFetches + " in " + durationS + " seconds.");
}
}
}

public class KeyVersionFetcher implements Callable<String> {

private final StoreInstance storeInstance;
private final byte[] key;

KeyVersionSampler(StoreInstance storeInstance, byte[] key) {
KeyVersionFetcher(StoreInstance storeInstance, byte[] key) {
this.storeInstance = storeInstance;
this.key = key;
}
Expand All @@ -127,6 +176,7 @@ public String call() throws Exception {
sb.append("\n");
replicationOffset++;
}
updateFetchProgress();
return sb.toString();
}
}
Expand Down Expand Up @@ -159,8 +209,8 @@ public boolean sampleStore(StoreDefinition storeDefinition) {
for(String keyLine = keyReader.readLine(); keyLine != null; keyLine = keyReader.readLine()) {
byte[] keyInBytes = ByteUtils.fromHexString(keyLine.trim());

KeyVersionSampler kvSampler = new KeyVersionSampler(storeInstance, keyInBytes);
Future<String> future = kvSamplerService.submit(kvSampler);
KeyVersionFetcher kvFetcher = new KeyVersionFetcher(storeInstance, keyInBytes);
Future<String> future = kvFetcherService.submit(kvFetcher);
futureKVs.add(future);
}

Expand Down Expand Up @@ -215,7 +265,7 @@ public void stop() {
if(adminClient != null) {
adminClient.close();
}
kvSamplerService.shutdown();
kvFetcherService.shutdown();
}

/**
Expand All @@ -231,7 +281,7 @@ private static OptionParser getParser() {
.describedAs("bootstrap-url")
.ofType(String.class);
parser.accepts("in-dir",
"[REQUIRED] Directory in which to find the input key files (named \"{storeName}.kvs\", generated by KeySamplerCLI.")
"[REQUIRED] Directory in which to find the input key files (named \"{storeName}.kvs\", generated by KeyFetcherCLI.")
.withRequiredArg()
.describedAs("inputDirectory")
.ofType(String.class);
Expand All @@ -240,12 +290,24 @@ private static OptionParser getParser() {
.withRequiredArg()
.describedAs("outputDirectory")
.ofType(String.class);
parser.accepts("store-names",
"Store names to sample. Comma delimited list or singleton. [Default: ALL]")
.withRequiredArg()
.describedAs("storeNames")
.withValuesSeparatedBy(',')
.ofType(String.class);
parser.accepts("parallelism",
"Number of key-versions to sample in parallel. [Default: " + KEY_PARALLELISM
+ " ]")
"Number of key-versions to sample in parallel. [Default: "
+ DEFAULT_KEY_PARALLELISM + " ]")
.withRequiredArg()
.describedAs("storeParallelism")
.ofType(Integer.class);
parser.accepts("progress-period-ops",
"Number of operations between progress info is displayed. [Default: "
+ DEFAULT_PROGRESS_PERIOD_OPS + " ]")
.withRequiredArg()
.describedAs("progressPeriodOps")
.ofType(Integer.class);
return parser;
}

Expand All @@ -254,15 +316,17 @@ private static OptionParser getParser() {
*/
private static void printUsage() {
StringBuilder help = new StringBuilder();
help.append("KeySamplerCLI Tool\n");
help.append("KeyFetcherCLI Tool\n");
help.append(" Find one key from each store-partition. Output keys per store.\n");
help.append("Options:\n");
help.append(" Required:\n");
help.append(" --url <bootstrap-url>\n");
help.append(" --in-dir <inputDirectory>\n");
help.append(" --out-dir <outputDirectory>\n");
help.append(" Optional:\n");
help.append(" --store-names <storeName>[,<storeName>...]\n");
help.append(" --parallelism <keyParallelism>\n");
help.append(" --progress-period-ops <progressPeriodOps>\n");
help.append(" --help\n");
System.out.print(help.toString());
}
Expand Down Expand Up @@ -309,16 +373,30 @@ public static void main(String[] args) throws Exception {
String outDir = (String) options.valueOf("out-dir");
Utils.mkdirs(new File(outDir));

Integer keyParallelism = KEY_PARALLELISM;
List<String> storeNames = null;
if(options.hasArgument("store-names")) {
@SuppressWarnings("unchecked")
List<String> list = (List<String>) options.valuesOf("store-names");
storeNames = list;
}

Integer keyParallelism = DEFAULT_KEY_PARALLELISM;
if(options.hasArgument("parallelism")) {
keyParallelism = (Integer) options.valueOf("parallelism");
}

Integer progressPeriodOps = DEFAULT_PROGRESS_PERIOD_OPS;
if(options.hasArgument("progress-period-ops")) {
progressPeriodOps = (Integer) options.valueOf("progress-period-ops");
}

try {
KeyVersionSamplerCLI sampler = new KeyVersionSamplerCLI(url,
KeyVersionFetcherCLI sampler = new KeyVersionFetcherCLI(url,
inDir,
outDir,
keyParallelism);
storeNames,
keyParallelism,
progressPeriodOps);

try {
if(!sampler.sampleStores()) {
Expand Down

0 comments on commit 92b80c0

Please sign in to comment.