Skip to content

Commit

Permalink
HDFS-12180. Ozone: Corona: Add stats and progress bar to corona. Cont…
Browse files Browse the repository at this point in the history
…ributed by Nandakumar.
  • Loading branch information
anuengineer authored and omalley committed Apr 26, 2018
1 parent a4b3160 commit fcd4537
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 14 deletions.
Expand Up @@ -55,7 +55,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/**
Expand Down
Expand Up @@ -77,6 +77,7 @@ public final class Corona extends Configured implements Tool {
private static final String HELP = "help";
private static final String MODE = "mode";
private static final String SOURCE = "source";
private static final String NUM_OF_THREADS = "numOfThreads";
private static final String NUM_OF_VOLUMES = "numOfVolumes";
private static final String NUM_OF_BUCKETS = "numOfBuckets";
private static final String NUM_OF_KEYS = "numOfKeys";
Expand All @@ -85,19 +86,21 @@ public final class Corona extends Configured implements Tool {
private static final String SOURCE_DEFAULT =
"https://commoncrawl.s3.amazonaws.com/" +
"crawl-data/CC-MAIN-2017-17/warc.paths.gz";
private static final String NUM_OF_THREADS_DEFAULT = "10";
private static final String NUM_OF_VOLUMES_DEFAULT = "10";
private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
private static final String NUM_OF_KEYS_DEFAULT = "500000";

private static final int NUM_OF_THREADS_DEFAULT = 10;

private static final Logger LOG =
LoggerFactory.getLogger(Corona.class);

private boolean printUsage = false;
private boolean completed = false;
private boolean exception = false;

private String mode;
private String source;
private String numOfThreads;
private String numOfVolumes;
private String numOfBuckets;
private String numOfKeys;
Expand All @@ -107,18 +110,29 @@ public final class Corona extends Configured implements Tool {

private long startTime;

private AtomicLong volumeCreationTime;
private AtomicLong bucketCreationTime;
private AtomicLong keyCreationTime;
private AtomicLong keyWriteTime;

private AtomicLong totalBytesWritten;

private AtomicInteger numberOfVolumesCreated;
private AtomicInteger numberOfBucketsCreated;
private AtomicLong numberOfKeysAdded;

private Corona(Configuration conf) throws IOException {
startTime = System.nanoTime();
volumeCreationTime = new AtomicLong();
bucketCreationTime = new AtomicLong();
keyCreationTime = new AtomicLong();
keyWriteTime = new AtomicLong();
totalBytesWritten = new AtomicLong();
numberOfVolumesCreated = new AtomicInteger();
numberOfBucketsCreated = new AtomicInteger();
numberOfKeysAdded = new AtomicLong();
OzoneClientFactory.setConfiguration(conf);
ozoneClient = OzoneClientFactory.getRpcClient();
processor = Executors.newFixedThreadPool(NUM_OF_THREADS_DEFAULT);
}

@Override
Expand All @@ -130,22 +144,29 @@ public int run(String[] args) throws Exception {
usage();
System.exit(0);
}
LOG.info("Number of Threads: " + numOfThreads);
processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
addShutdownHook();
if(mode.equals("online")) {
LOG.info("Mode: online");
throw new UnsupportedOperationException("Not yet implemented.");
} else {
LOG.info("Mode: offline");
LOG.info("Number of Volumes: {}.", numOfBuckets);
LOG.info("Number of Volumes: {}.", numOfVolumes);
LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
String volume = "vol-" + i + "-" +
RandomStringUtils.randomNumeric(5);
processor.submit(new OfflineProcessor(volume));
}
Thread progressbar = getProgressBarThread();
LOG.info("Starting progress bar Thread.");
progressbar.start();
processor.shutdown();
processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
completed = true;
progressbar.join();
return 0;
}
}
Expand All @@ -168,6 +189,12 @@ private Options getOzonePetaGenOptions() {
"commoncrawl warc file to be used when the mode is online.");
Option optSource = OptionBuilder.create(SOURCE);

OptionBuilder.withArgName("value");
OptionBuilder.hasArg();
OptionBuilder.withDescription("number of threads to be launched " +
"for the run");
Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS);

OptionBuilder.withArgName("value");
OptionBuilder.hasArg();
OptionBuilder.withDescription("specifies number of Volumes to be " +
Expand All @@ -189,6 +216,7 @@ private Options getOzonePetaGenOptions() {
options.addOption(optHelp);
options.addOption(optMode);
options.addOption(optSource);
options.addOption(optNumOfThreads);
options.addOption(optNumOfVolumes);
options.addOption(optNumOfBuckets);
options.addOption(optNumOfKeys);
Expand All @@ -204,6 +232,9 @@ private void parseOzonePetaGenOptions(CommandLine cmdLine) {
source = cmdLine.hasOption(SOURCE) ?
cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;

numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;

numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;

Expand All @@ -216,6 +247,8 @@ private void parseOzonePetaGenOptions(CommandLine cmdLine) {

private void usage() {
System.out.println("Options supported are:");
System.out.println("-numOfThreads <value> "
+ "number of threads to be launched for the run.");
System.out.println("-mode [online | offline] "
+ "specifies the mode in which Corona should run.");
System.out.println("-source <url> "
Expand Down Expand Up @@ -245,7 +278,9 @@ private class OfflineProcessor implements Runnable {
this.totalKeys = Integer.parseInt(numOfKeys);
this.volume = volume;
LOG.trace("Creating volume: {}", volume);
long start = System.nanoTime();
ozoneClient.createVolume(this.volume);
volumeCreationTime.getAndAdd(System.nanoTime() - start);
numberOfVolumesCreated.getAndIncrement();
}

Expand All @@ -256,7 +291,9 @@ public void run() {
RandomStringUtils.randomNumeric(5);
try {
LOG.trace("Creating bucket: {} in volume: {}", bucket, volume);
long start = System.nanoTime();
ozoneClient.createBucket(volume, bucket);
bucketCreationTime.getAndAdd(System.nanoTime() - start);
numberOfBucketsCreated.getAndIncrement();
for (int k = 0; k < totalKeys; k++) {
String key = "key-" + k + "-" +
Expand All @@ -265,17 +302,24 @@ public void run() {
try {
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
key, bucket, volume);
long keyCreateStart = System.nanoTime();
OzoneOutputStream os = ozoneClient.createKey(
volume, bucket, key, value.length);
keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
long keyWriteStart = System.nanoTime();
os.write(value);
os.close();
keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
totalBytesWritten.getAndAdd(value.length);
numberOfKeysAdded.getAndIncrement();
} catch (Exception e) {
exception = true;
LOG.error("Exception while adding key: {} in bucket: {}" +
" of volume: {}.", key, bucket, volume, e);
}
}
} catch (Exception e) {
exception = true;
LOG.error("Exception while creating bucket: {}" +
" in volume: {}.", bucket, volume, e);
}
Expand All @@ -287,11 +331,82 @@ public void run() {
* Adds ShutdownHook to print statistics.
*/
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
printStats(System.out);
Runtime.getRuntime().addShutdownHook(
new Thread(() -> printStats(System.out)));
}

private Thread getProgressBarThread() {
long maxValue = Integer.parseInt(numOfVolumes) *
Integer.parseInt(numOfBuckets) *
Integer.parseInt(numOfKeys);
Thread progressBarThread = new Thread(
new ProgressBar(System.out, maxValue));
progressBarThread.setName("ProgressBar");
return progressBarThread;
}

private class ProgressBar implements Runnable {

private final long refreshInterval = 1000L;

private PrintStream stream;
private long maxValue;

ProgressBar(PrintStream stream, long maxValue) {
this.stream = stream;
this.maxValue = maxValue;
}

@Override
public void run() {
try {
stream.println();
long keys;
while((keys = numberOfKeysAdded.get()) < maxValue) {
print(keys);
if(completed) {
break;
}
Thread.sleep(refreshInterval);
}
if(exception) {
stream.println();
stream.println("Incomplete termination, " +
"check log for exception.");
} else {
print(maxValue);
}
stream.println();
} catch (InterruptedException e) {
}
}

/**
* Given current value prints the progress bar.
*
* @param currentValue
*/
private void print(long currentValue) {
stream.print('\r');
double percent = 100.0 * currentValue / maxValue;
StringBuilder sb = new StringBuilder();
sb.append(" " + String.format("%.2f", percent) + "% |");

for (int i = 0; i <= percent; i++) {
sb.append('█');
}
for (int j = 0; j < 100 - percent; j++) {
sb.append(' ');
}
});
sb.append("| ");
sb.append(currentValue + "/" + maxValue);
long timeInSec = TimeUnit.SECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
(timeInSec % 3600) / 60, timeInSec % 60);
sb.append(" Time: " + timeToPrint);
stream.print(sb);
}
}

/**
Expand All @@ -300,15 +415,81 @@ public void run() {
* @param out PrintStream
*/
private void printStats(PrintStream out) {
long timeInSec = TimeUnit.SECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
String timeToPrint = timeInSec < 60 ? timeInSec + " seconds" :
TimeUnit.MINUTES.convert(timeInSec, TimeUnit.SECONDS) + " minuites";
int threadCount = Integer.parseInt(numOfThreads);

long endTime = System.nanoTime() - startTime;
String execTime = String.format("%02d:%02d:%02d",
TimeUnit.NANOSECONDS.toHours(endTime),
TimeUnit.NANOSECONDS.toMinutes(endTime) -
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(endTime)),
TimeUnit.NANOSECONDS.toSeconds(endTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(endTime)));

long volumeTime = volumeCreationTime.longValue();
String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d",
TimeUnit.NANOSECONDS.toHours(volumeTime),
TimeUnit.NANOSECONDS.toMinutes(volumeTime) -
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(volumeTime)),
TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
TimeUnit.NANOSECONDS.toMillis(volumeTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(volumeTime)));

long bucketTime = bucketCreationTime.longValue() / threadCount;
String prettyBucketTime = String.format("%02d:%02d:%02d:%02d",
TimeUnit.NANOSECONDS.toHours(bucketTime),
TimeUnit.NANOSECONDS.toMinutes(bucketTime) -
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(bucketTime)),
TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
TimeUnit.NANOSECONDS.toMillis(bucketTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(bucketTime)));

long totalKeyCreationTime = keyCreationTime.longValue() / threadCount;
String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d",
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime),
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) -
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));

long totalKeyWriteTime = keyWriteTime.longValue() / threadCount;
String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d",
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime),
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) -
TimeUnit.HOURS.toMinutes(
TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
TimeUnit.MINUTES.toSeconds(
TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
TimeUnit.SECONDS.toMillis(
TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));

out.println();
out.println("***************************************************");
out.println("Number of Volumes created: " + numberOfVolumesCreated);
out.println("Number of Buckets created: " + numberOfBucketsCreated);
out.println("Number of Keys added: " + numberOfKeysAdded);
out.println("Execution time: " + timeToPrint);
out.println("Time spent in volume creation: " + prettyVolumeTime);
out.println("Time spent in bucket creation: " + prettyBucketTime);
out.println("Time spent in key creation: " + prettyKeyCreationTime);
out.println("Time spent in writing keys: " + prettyKeyWriteTime);
out.println("Total bytes written: " + totalBytesWritten);
out.println("Total Execution time: " + execTime);
out.println("***************************************************");
}

Expand Down

0 comments on commit fcd4537

Please sign in to comment.