From 10732107c8220d5e2befdf9c7cf9b290d7fc9201 Mon Sep 17 00:00:00 2001 From: Zhuobin Zheng Date: Fri, 17 Dec 2021 16:34:48 +0800 Subject: [PATCH] EC benchmark add options. buffer size and random read --- .../hdfs/ErasureCodeBenchmarkThroughput.java | 88 +++++++++++++------ 1 file changed, 62 insertions(+), 26 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java index b2ef1b4ec3065..a15be4df64916 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java @@ -42,6 +42,7 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -69,7 +70,8 @@ public class ErasureCodeBenchmarkThroughput extends Configured implements Tool { - private static final int BUFFER_SIZE_MB = 128; + private static final int BUFFER_SIZE_MB = Integer.valueOf(System.getProperty( + "test.benchmark.buffer.mb", "128")); private static final String DFS_TMP_DIR = System.getProperty( "test.benchmark.data", "/tmp/benchmark/data"); public static final String REP_DIR = DFS_TMP_DIR + "/replica"; @@ -101,6 +103,10 @@ enum OpType { READ, WRITE, GEN, CLEAN; } + enum ReadType { + STATEFUL, POSITIONAL, RANDOM; + } + public static String getFilePath(int dataSizeMB, boolean isEc) { String parent = isEc ? EC_DIR : REP_DIR; String file = isEc ? EC_FILE_BASE : REP_FILE_BASE; @@ -113,19 +119,20 @@ private static void printUsage(String msg) { } System.err.println("Usage: ErasureCodeBenchmarkThroughput " + " " + - " [num clients] [stf|pos]\n" + - "Stateful and positional option is only available for read."); + " [num clients] [stf|pos|rdm]\n" + + "Stateful and positional and random option is only available for read." + ); System.exit(1); } private List doBenchmark(boolean isRead, int dataSizeMB, - int numClients, boolean isEc, boolean statefulRead, boolean isGen) + int numClients, boolean isEc, ReadType readType, boolean isGen) throws Exception { CompletionService cs = new ExecutorCompletionService( Executors.newFixedThreadPool(numClients)); for (int i = 0; i < numClients; i++) { cs.submit(isRead ? - new ReadCallable(dataSizeMB, isEc, i, statefulRead) : + new ReadCallable(dataSizeMB, isEc, i, readType) : new WriteCallable(dataSizeMB, isEc, i, isGen)); } List results = new ArrayList<>(numClients); @@ -146,21 +153,21 @@ private DecimalFormat getDecimalFormat() { } private void benchmark(OpType type, int dataSizeMB, - int numClients, boolean isEc, boolean statefulRead) throws Exception { + int numClients, boolean isEc, ReadType readType) throws Exception { List sizes = null; StopWatch sw = new StopWatch().start(); switch (type) { case READ: sizes = doBenchmark(true, dataSizeMB, numClients, isEc, - statefulRead, false); + readType, false); break; case WRITE: sizes = doBenchmark( - false, dataSizeMB, numClients, isEc, statefulRead, false); + false, dataSizeMB, numClients, isEc, readType, false); break; case GEN: sizes = doBenchmark(false, dataSizeMB, numClients, isEc, - statefulRead, true); + readType, true); } long elapsedSec = sw.now(TimeUnit.SECONDS); double totalDataSizeMB = 0; @@ -204,7 +211,7 @@ public int run(String[] args) throws Exception { int dataSizeMB = 0; boolean isEc = true; int numClients = 1; - boolean statefulRead = true; + ReadType readType = ReadType.STATEFUL; if (args.length >= 3) { if (args[0].equals("read")) { type = OpType.READ; @@ -243,8 +250,11 @@ public int run(String[] args) throws Exception { } } if (args.length >= 5 && type == OpType.READ) { - statefulRead = args[4].equals("stf"); - if (!statefulRead && !args[4].equals("pos")) { + switch (args[4]) { + case "stf": readType = ReadType.STATEFUL; break; + case "pos": readType = ReadType.POSITIONAL; break; + case "rdm": readType = ReadType.RANDOM; break; + default: printUsage("Unknown read mode: " + args[4]); } } @@ -256,7 +266,7 @@ public int run(String[] args) throws Exception { if (type == OpType.READ && isEc) { setReadThreadPoolSize(numClients); } - benchmark(type, dataSizeMB, numClients, isEc, statefulRead); + benchmark(type, dataSizeMB, numClients, isEc, readType); } return 0; } @@ -345,12 +355,12 @@ public Long call() throws Exception { } private class ReadCallable extends CallableBase { - private final boolean statefulRead; + private final ReadType readType; public ReadCallable(int dataSizeMB, boolean isEc, int id, - boolean statefulRead) throws IOException { + ReadType readType) throws IOException { super(dataSizeMB, isEc, id); - this.statefulRead = statefulRead; + this.readType = readType; } private long doStateful(FSDataInputStream inputStream) throws IOException { @@ -383,17 +393,36 @@ private long doPositional(FSDataInputStream inputStream) return count; } + private long doRandom(FSDataInputStream inputStream) throws IOException { + ThreadLocalRandom random = ThreadLocalRandom.current(); + long dataSize = dataSizeMB * 1024 * 1024L; + long count = 0; + long bytesRead; + byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024]; + while (true) { + bytesRead = inputStream.read(random.nextLong(dataSize), buf, 0, buf.length); + count += bytesRead; + if (count >= dataSize) { + break; + } + } + return count; + + } + private long readFile(Path path) throws IOException { try (FSDataInputStream inputStream = fs.open(path)) { StopWatch sw = new StopWatch().start(); - System.out.println((statefulRead ? "Stateful reading " : - "Positional reading ") + path); - long totalRead = statefulRead ? doStateful(inputStream) : - doPositional(inputStream); - System.out.println( - (statefulRead ? "Finished stateful read " : - "Finished positional read ") + path + ". Time taken: " + - sw.now(TimeUnit.SECONDS) + " s."); + + System.out.println(readType + " reading " + path); + long totalRead = 0; + switch (readType) { + case STATEFUL: totalRead = doStateful(inputStream); break; + case POSITIONAL: totalRead = doPositional(inputStream); break; + case RANDOM: totalRead = doRandom(inputStream); break; + } + System.out.println("Finished " + readType + " read " + path + + ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s."); return totalRead; } } @@ -408,8 +437,15 @@ public Long call() throws Exception { } long bytesRead = readFile(path); long dataSize = dataSizeMB * 1024 * 1024L; - Preconditions.checkArgument(bytesRead == dataSize, - "Specified data size: " + dataSize + ", actually read " + bytesRead); + if (ReadType.RANDOM.equals(readType)) { + Preconditions.checkArgument(bytesRead >= dataSize, + "Expect read more than size: " + dataSize + + ", actually read " + bytesRead); + } else { + Preconditions.checkArgument(bytesRead == dataSize, + "Specified data size: " + dataSize + ", actually read " + + bytesRead); + } return bytesRead; } }