Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16390 Enhance ErasureCodeBenchmarkThroughput for support random read and make buffer size customizable #3818

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -69,7 +70,8 @@
public class ErasureCodeBenchmarkThroughput
extends Configured implements Tool {

private static final int BUFFER_SIZE_MB = 128;
private static final int BUFFER_SIZE_MB = Integer.valueOf(System.getProperty(
"test.benchmark.buffer.mb", "128"));
private static final String DFS_TMP_DIR = System.getProperty(
"test.benchmark.data", "/tmp/benchmark/data");
public static final String REP_DIR = DFS_TMP_DIR + "/replica";
Expand Down Expand Up @@ -101,6 +103,10 @@ enum OpType {
READ, WRITE, GEN, CLEAN;
}

enum ReadType {
STATEFUL, POSITIONAL, RANDOM;
}

public static String getFilePath(int dataSizeMB, boolean isEc) {
String parent = isEc ? EC_DIR : REP_DIR;
String file = isEc ? EC_FILE_BASE : REP_FILE_BASE;
Expand All @@ -113,19 +119,20 @@ private static void printUsage(String msg) {
}
System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
"<read|write|gen|clean> <size in MB> " +
"<ec|rep> [num clients] [stf|pos]\n" +
"Stateful and positional option is only available for read.");
"<ec|rep> [num clients] [stf|pos|rdm]\n" +
"Stateful and positional and random option is only available for read."
);
System.exit(1);
}

private List<Long> doBenchmark(boolean isRead, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead, boolean isGen)
int numClients, boolean isEc, ReadType readType, boolean isGen)
throws Exception {
CompletionService<Long> cs = new ExecutorCompletionService<Long>(
Executors.newFixedThreadPool(numClients));
for (int i = 0; i < numClients; i++) {
cs.submit(isRead ?
new ReadCallable(dataSizeMB, isEc, i, statefulRead) :
new ReadCallable(dataSizeMB, isEc, i, readType) :
new WriteCallable(dataSizeMB, isEc, i, isGen));
}
List<Long> results = new ArrayList<>(numClients);
Expand All @@ -146,21 +153,21 @@ private DecimalFormat getDecimalFormat() {
}

private void benchmark(OpType type, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead) throws Exception {
int numClients, boolean isEc, ReadType readType) throws Exception {
List<Long> sizes = null;
StopWatch sw = new StopWatch().start();
switch (type) {
case READ:
sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
statefulRead, false);
readType, false);
break;
case WRITE:
sizes = doBenchmark(
false, dataSizeMB, numClients, isEc, statefulRead, false);
false, dataSizeMB, numClients, isEc, readType, false);
break;
case GEN:
sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
statefulRead, true);
readType, true);
}
long elapsedSec = sw.now(TimeUnit.SECONDS);
double totalDataSizeMB = 0;
Expand Down Expand Up @@ -204,7 +211,7 @@ public int run(String[] args) throws Exception {
int dataSizeMB = 0;
boolean isEc = true;
int numClients = 1;
boolean statefulRead = true;
ReadType readType = ReadType.STATEFUL;
if (args.length >= 3) {
if (args[0].equals("read")) {
type = OpType.READ;
Expand Down Expand Up @@ -243,8 +250,11 @@ public int run(String[] args) throws Exception {
}
}
if (args.length >= 5 && type == OpType.READ) {
statefulRead = args[4].equals("stf");
if (!statefulRead && !args[4].equals("pos")) {
switch (args[4]) {
case "stf": readType = ReadType.STATEFUL; break;
case "pos": readType = ReadType.POSITIONAL; break;
case "rdm": readType = ReadType.RANDOM; break;
default:
printUsage("Unknown read mode: " + args[4]);
}
}
Expand All @@ -256,7 +266,7 @@ public int run(String[] args) throws Exception {
if (type == OpType.READ && isEc) {
setReadThreadPoolSize(numClients);
}
benchmark(type, dataSizeMB, numClients, isEc, statefulRead);
benchmark(type, dataSizeMB, numClients, isEc, readType);
}
return 0;
}
Expand Down Expand Up @@ -345,12 +355,12 @@ public Long call() throws Exception {
}

private class ReadCallable extends CallableBase {
private final boolean statefulRead;
private final ReadType readType;

public ReadCallable(int dataSizeMB, boolean isEc, int id,
boolean statefulRead) throws IOException {
ReadType readType) throws IOException {
super(dataSizeMB, isEc, id);
this.statefulRead = statefulRead;
this.readType = readType;
}

private long doStateful(FSDataInputStream inputStream) throws IOException {
Expand Down Expand Up @@ -383,17 +393,36 @@ private long doPositional(FSDataInputStream inputStream)
return count;
}

private long doRandom(FSDataInputStream inputStream) throws IOException {
ThreadLocalRandom random = ThreadLocalRandom.current();
long dataSize = dataSizeMB * 1024 * 1024L;
long count = 0;
long bytesRead;
byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024];
while (true) {
bytesRead = inputStream.read(random.nextLong(dataSize), buf, 0, buf.length);
count += bytesRead;
if (count >= dataSize) {
break;
}
}
return count;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Please remove the blank line and fix checkstyle warn.

}

private long readFile(Path path) throws IOException {
try (FSDataInputStream inputStream = fs.open(path)) {
StopWatch sw = new StopWatch().start();
System.out.println((statefulRead ? "Stateful reading " :
"Positional reading ") + path);
long totalRead = statefulRead ? doStateful(inputStream) :
doPositional(inputStream);
System.out.println(
(statefulRead ? "Finished stateful read " :
"Finished positional read ") + path + ". Time taken: " +
sw.now(TimeUnit.SECONDS) + " s.");

System.out.println(readType + " reading " + path);
long totalRead = 0;
switch (readType) {
case STATEFUL: totalRead = doStateful(inputStream); break;
case POSITIONAL: totalRead = doPositional(inputStream); break;
case RANDOM: totalRead = doRandom(inputStream); break;
}
System.out.println("Finished " + readType + " read " + path
+ ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s.");
return totalRead;
}
}
Expand All @@ -408,8 +437,15 @@ public Long call() throws Exception {
}
long bytesRead = readFile(path);
long dataSize = dataSizeMB * 1024 * 1024L;
Preconditions.checkArgument(bytesRead == dataSize,
"Specified data size: " + dataSize + ", actually read " + bytesRead);
if (ReadType.RANDOM.equals(readType)) {
Preconditions.checkArgument(bytesRead >= dataSize,
"Expect read more than size: " + dataSize +
", actually read " + bytesRead);
} else {
Preconditions.checkArgument(bytesRead == dataSize,
"Specified data size: " + dataSize + ", actually read "
+ bytesRead);
}
return bytesRead;
}
}
Expand Down