Skip to content
Permalink
Browse files
fixes #67 improves group commit performance test
  • Loading branch information
keith-turner committed Sep 25, 2020
1 parent 83a60e9 commit b0867a79ab3f380ddbbc641e0f447b512643aa45
Showing 2 changed files with 68 additions and 140 deletions.
@@ -17,7 +17,6 @@

package org.apache.accumulo.testing.performance.tests;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -37,8 +36,13 @@
import org.apache.accumulo.testing.performance.Report;
import org.apache.accumulo.testing.performance.SystemConfiguration;

import com.google.common.base.Preconditions;

public class GroupCommitPT implements PerformanceTest {

private static final int NUM_MUTATIONS = 2048 * 1024;
private static final int NUM_FLUSHES = 1024;

static Mutation createRandomMutation(Random rand) {
byte row[] = new byte[16];

@@ -60,37 +64,34 @@ static Mutation createRandomMutation(Random rand) {

static class WriteTask implements Runnable {

private int numToWrite;
private int numToBatch;
private int batchSize;
private BatchWriter bw;
private volatile long time = -1;
private volatile int written = 0;

WriteTask(BatchWriter bw, int numToWrite, int numToBatch) throws Exception {
WriteTask(BatchWriter bw, int numMutations) throws Exception {
Preconditions.checkArgument(numMutations >= NUM_FLUSHES);
Preconditions.checkArgument(numMutations % NUM_FLUSHES == 0);
this.bw = bw;
this.numToWrite = numToWrite;
this.numToBatch = numToBatch;
this.batchSize = numMutations / NUM_FLUSHES;
}

@Override
public void run() {
Random rand = new Random();

int written = 0;

try {
long t1 = System.currentTimeMillis();
for (int i = 0; i < numToWrite; i++) {
for (int i = 0; i < NUM_FLUSHES; i++) {
Mutation mut = createRandomMutation(rand);

for (int j = 0; j < numToBatch; j++) {
for (int j = 0; j < batchSize; j++) {
bw.addMutation(mut);
written++;
}

bw.flush();
}

// bw.flush();

long t2 = System.currentTimeMillis();
this.time = t2 - t1;
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -101,10 +102,12 @@ public void run() {
}

}

this.written = written;
}

long getTime() {
return time;
public int getWritten() {
return written;
}
}

@@ -121,43 +124,50 @@ public SystemConfiguration getSystemConfig() {

@Override
public Report runTest(Environment env) throws Exception {
int numThreads = 128;

Report.Builder report = Report.builder();
report.id("mutslam");
report.description("Runs multiple threads to test performance of a group commit. "
+ " This tests threads with client side group commit, using a single batch writer");

double batchValues;

for (int i = 0; i < 6; i++) {
// This test threads w/ group commit on the client side, using a single batch writer.
// Each thread flushes after each mutation.
batchValues = runBatch(env, numThreads, 1);

report.info("threadsTime" + i, new Double(new DecimalFormat("#0.00").format(batchValues)),
"Time it took the task to run in milliseconds");
report.info("threads" + i, i, "Number of threads");
report.info("batch" + i, 1, "Number of batches");
report.description("Data written to Accumulo is appended to a write ahead log before its made "
+ "available for scan. There is a single write ahead log per tablet server. Data from "
+ "multiple concurrent clients is batched together and appended to the write ahead log, "
+ "this is called group commit. If group commit is not working properly, then performance"
+ " of concurrent writes could suffer. This performance test measures group commit. In "
+ "an Accumulo client, when the batch writer is flushed this forces an append to the write"
+ " ahead log. The batch writer flush call does not return until the append is complete. "
+ "This test writes the same amount of data using different numbers of threads to check if"
+ " group commit is working properly. When the test is using one thread it will write "
+ "2048K total mutations calling flush on the batchwriter 1024 times. When"
+ " the test is running two threads, each thread will write 1024K mutations calling "
+ "flush 1024 times. The pattern is that as the number of threads increases"
+ ", the amount of data written per thread decreases proportionally. However the number of"
+ " flushes done by threads is constant. If group commit is working well, then the overall"
+ " write rate should not be significantly less as the number of threads increases.");

report.parameter("num_mutations", NUM_MUTATIONS,
"The total number of mutations each test will write. Each thread in a test will write num_mutations/num_threads_in_test mutations.");
report.parameter("num_flushes", NUM_FLUSHES,
"The number of times each thread will flush its batch writer. The flushes are spread evenly between mutations.");

// number of threads to run for each test
int tests[] = new int[] {1, 2, 4, 8, 16, 32, 64};

// run warm up test
for (int numThreads : tests) {
runTest(report, env, numThreads, true);
}

for (int i = 0; i < 6; i++) {
// This tests a single thread writing a different batch sizes of mutations,
// flushing after each batch. Group commit should approach these times for the same number
// mutations.

batchValues = runBatch(env, 1, numThreads);

report.info("batchTime" + i, new Double(new DecimalFormat("#0.00").format(batchValues)),
"Time it took the task to run in milliseconds");
report.info("threads" + i, i, "Number of threads");
report.info("batch" + i, 1, "Number of batches");
// run real test
for (int numThreads : tests) {
runTest(report, env, numThreads, false);
}

return report.build();
}

private double runBatch(Environment env, int numThreads, int numToBatch) throws Exception {
private void runTest(Report.Builder report, Environment env, int numThreads, boolean warmup)
throws Exception {

Preconditions.checkArgument(NUM_MUTATIONS % numThreads == 0);

String tableName = "mutslam";
env.getClient().tableOperations().create(tableName);
@@ -168,21 +178,22 @@ private double runBatch(Environment env, int numThreads, int numToBatch) throws
entry.getValue();
}

// number of batches each thread should write
int numToWrite = 100;
int mutationsPerThread = NUM_MUTATIONS / numThreads;

ArrayList<WriteTask> wasks = new ArrayList<WriteTask>();
ArrayList<Thread> threads = new ArrayList<Thread>();

for (int i = 0; i < numThreads; i++) {
WriteTask wask = new WriteTask(env.getClient().createBatchWriter(tableName,
new BatchWriterConfig().setMaxWriteThreads(1)), numToWrite, numToBatch);
new BatchWriterConfig().setMaxWriteThreads(1)), mutationsPerThread);

wasks.add(wask);
Thread thread = new Thread(wask);
threads.add(thread);
}

long t1 = System.currentTimeMillis();

for (Thread thread : threads) {
thread.start();
}
@@ -191,19 +202,19 @@ private double runBatch(Environment env, int numThreads, int numToBatch) throws
thread.join();
}

long sum = 0;
for (WriteTask writeTask : wasks) {
sum += writeTask.getTime();
}
long t2 = System.currentTimeMillis();

// System.out.printf(
// "\ttime: %8.2f #threads: %3d #batch: %2d #mutations: %4d rate: %6.2f mutations/ms\n",
// sum / (double) wasks.size(), numThreads, numToBatch, totalNumMutations,
// totalNumMutations / (sum / (double) wasks.size()));
// ensure all thread wrote the expected number of mutations
Preconditions.checkState(wasks.stream().mapToInt(WriteTask::getWritten).sum() == NUM_MUTATIONS);

env.getClient().tableOperations().delete(tableName);

return sum / (double) wasks.size(); // time
if (warmup) {
report.info("warmup_rate_" + numThreads, NUM_MUTATIONS, t2 - t1, "The warmup rate at which "
+ numThreads + " threads wrote data. The rate is mutations per second.");
} else {
report.result("rate_" + numThreads, NUM_MUTATIONS, t2 - t1, "The rate at which " + numThreads
+ " threads wrote data. The rate is mutations per second.");
}
}

}

This file was deleted.

0 comments on commit b0867a7

Please sign in to comment.