Skip to content

Commit

Permalink
EC benchmark add options. buffer size and random read
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengzhuobinzzb committed Dec 20, 2021
1 parent 0714142 commit 1073210
Showing 1 changed file with 62 additions and 26 deletions.
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -69,7 +70,8 @@
public class ErasureCodeBenchmarkThroughput
extends Configured implements Tool {

private static final int BUFFER_SIZE_MB = 128;
private static final int BUFFER_SIZE_MB = Integer.valueOf(System.getProperty(
"test.benchmark.buffer.mb", "128"));
private static final String DFS_TMP_DIR = System.getProperty(
"test.benchmark.data", "/tmp/benchmark/data");
public static final String REP_DIR = DFS_TMP_DIR + "/replica";
Expand Down Expand Up @@ -101,6 +103,10 @@ enum OpType {
READ, WRITE, GEN, CLEAN;
}

enum ReadType {
STATEFUL, POSITIONAL, RANDOM;
}

public static String getFilePath(int dataSizeMB, boolean isEc) {
String parent = isEc ? EC_DIR : REP_DIR;
String file = isEc ? EC_FILE_BASE : REP_FILE_BASE;
Expand All @@ -113,19 +119,20 @@ private static void printUsage(String msg) {
}
System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
"<read|write|gen|clean> <size in MB> " +
"<ec|rep> [num clients] [stf|pos]\n" +
"Stateful and positional option is only available for read.");
"<ec|rep> [num clients] [stf|pos|rdm]\n" +
"Stateful and positional and random option is only available for read."
);
System.exit(1);
}

private List<Long> doBenchmark(boolean isRead, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead, boolean isGen)
int numClients, boolean isEc, ReadType readType, boolean isGen)
throws Exception {
CompletionService<Long> cs = new ExecutorCompletionService<Long>(
Executors.newFixedThreadPool(numClients));
for (int i = 0; i < numClients; i++) {
cs.submit(isRead ?
new ReadCallable(dataSizeMB, isEc, i, statefulRead) :
new ReadCallable(dataSizeMB, isEc, i, readType) :
new WriteCallable(dataSizeMB, isEc, i, isGen));
}
List<Long> results = new ArrayList<>(numClients);
Expand All @@ -146,21 +153,21 @@ private DecimalFormat getDecimalFormat() {
}

private void benchmark(OpType type, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead) throws Exception {
int numClients, boolean isEc, ReadType readType) throws Exception {
List<Long> sizes = null;
StopWatch sw = new StopWatch().start();
switch (type) {
case READ:
sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
statefulRead, false);
readType, false);
break;
case WRITE:
sizes = doBenchmark(
false, dataSizeMB, numClients, isEc, statefulRead, false);
false, dataSizeMB, numClients, isEc, readType, false);
break;
case GEN:
sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
statefulRead, true);
readType, true);
}
long elapsedSec = sw.now(TimeUnit.SECONDS);
double totalDataSizeMB = 0;
Expand Down Expand Up @@ -204,7 +211,7 @@ public int run(String[] args) throws Exception {
int dataSizeMB = 0;
boolean isEc = true;
int numClients = 1;
boolean statefulRead = true;
ReadType readType = ReadType.STATEFUL;
if (args.length >= 3) {
if (args[0].equals("read")) {
type = OpType.READ;
Expand Down Expand Up @@ -243,8 +250,11 @@ public int run(String[] args) throws Exception {
}
}
if (args.length >= 5 && type == OpType.READ) {
statefulRead = args[4].equals("stf");
if (!statefulRead && !args[4].equals("pos")) {
switch (args[4]) {
case "stf": readType = ReadType.STATEFUL; break;
case "pos": readType = ReadType.POSITIONAL; break;
case "rdm": readType = ReadType.RANDOM; break;
default:
printUsage("Unknown read mode: " + args[4]);
}
}
Expand All @@ -256,7 +266,7 @@ public int run(String[] args) throws Exception {
if (type == OpType.READ && isEc) {
setReadThreadPoolSize(numClients);
}
benchmark(type, dataSizeMB, numClients, isEc, statefulRead);
benchmark(type, dataSizeMB, numClients, isEc, readType);
}
return 0;
}
Expand Down Expand Up @@ -345,12 +355,12 @@ public Long call() throws Exception {
}

private class ReadCallable extends CallableBase {
private final boolean statefulRead;
private final ReadType readType;

public ReadCallable(int dataSizeMB, boolean isEc, int id,
boolean statefulRead) throws IOException {
ReadType readType) throws IOException {
super(dataSizeMB, isEc, id);
this.statefulRead = statefulRead;
this.readType = readType;
}

private long doStateful(FSDataInputStream inputStream) throws IOException {
Expand Down Expand Up @@ -383,17 +393,36 @@ private long doPositional(FSDataInputStream inputStream)
return count;
}

private long doRandom(FSDataInputStream inputStream) throws IOException {
ThreadLocalRandom random = ThreadLocalRandom.current();
long dataSize = dataSizeMB * 1024 * 1024L;
long count = 0;
long bytesRead;
byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024];
while (true) {
bytesRead = inputStream.read(random.nextLong(dataSize), buf, 0, buf.length);
count += bytesRead;
if (count >= dataSize) {
break;
}
}
return count;

}

private long readFile(Path path) throws IOException {
try (FSDataInputStream inputStream = fs.open(path)) {
StopWatch sw = new StopWatch().start();
System.out.println((statefulRead ? "Stateful reading " :
"Positional reading ") + path);
long totalRead = statefulRead ? doStateful(inputStream) :
doPositional(inputStream);
System.out.println(
(statefulRead ? "Finished stateful read " :
"Finished positional read ") + path + ". Time taken: " +
sw.now(TimeUnit.SECONDS) + " s.");

System.out.println(readType + " reading " + path);
long totalRead = 0;
switch (readType) {
case STATEFUL: totalRead = doStateful(inputStream); break;
case POSITIONAL: totalRead = doPositional(inputStream); break;
case RANDOM: totalRead = doRandom(inputStream); break;
}
System.out.println("Finished " + readType + " read " + path
+ ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s.");
return totalRead;
}
}
Expand All @@ -408,8 +437,15 @@ public Long call() throws Exception {
}
long bytesRead = readFile(path);
long dataSize = dataSizeMB * 1024 * 1024L;
Preconditions.checkArgument(bytesRead == dataSize,
"Specified data size: " + dataSize + ", actually read " + bytesRead);
if (ReadType.RANDOM.equals(readType)) {
Preconditions.checkArgument(bytesRead >= dataSize,
"Expect read more than size: " + dataSize +
", actually read " + bytesRead);
} else {
Preconditions.checkArgument(bytesRead == dataSize,
"Specified data size: " + dataSize + ", actually read "
+ bytesRead);
}
return bytesRead;
}
}
Expand Down

0 comments on commit 1073210

Please sign in to comment.