Skip to content

Commit

Permalink
fix partitioned bulk insert benchmark
Browse files Browse the repository at this point in the history
uniqueness of primary keys was not guaranteed, random number of partitions created, not really comparable
  • Loading branch information
msbt committed Apr 20, 2015
1 parent 9595e40 commit 92c0fc7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
public class BulkRetryCoordinator {

private static final ESLogger LOGGER = Loggers.getLogger(BulkRetryCoordinator.class);
private static final int DELAY_INCREMENT = 1;

private final ReadWriteLock retryLock;
private final AtomicInteger currentDelay;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void retry(final ShardUpsertRequest request,
final RetryBulkActionListener<ShardUpsertRequest, ShardUpsertResponse> retryBulkActionListener = new RetryBulkActionListener<>(retryListener, request);
if (repeatingRetry) {
try {
Thread.sleep(currentDelay.getAndAdd(10));
Thread.sleep(currentDelay.getAndAdd(DELAY_INCREMENT));
} catch (InterruptedException e) {
Thread.interrupted();
}
Expand All @@ -100,7 +101,7 @@ public void run() {
LOGGER.trace("retry thread [{}] executing", Thread.currentThread().getName());
delegate.execute(request, retryBulkActionListener);
}
}, currentDelay.getAndAdd(10), TimeUnit.MILLISECONDS);
}, currentDelay.getAndAdd(DELAY_INCREMENT), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkHistoryChart;
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
import io.crate.TimestampFormat;
import io.crate.action.sql.SQLBulkAction;
import io.crate.action.sql.SQLBulkRequest;
import io.crate.action.sql.SQLBulkResponse;
import org.apache.commons.lang3.RandomStringUtils;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.junit.AfterClass;
Expand All @@ -38,6 +40,10 @@
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

@AxisRange(min = 0)
@BenchmarkHistoryChart(filePrefix="benchmark-partitioned-bulk-insert-history")
@BenchmarkMethodChart(filePrefix = "benchmark-partitioned-bulk-insert")
Expand All @@ -54,6 +60,9 @@ public class PartitionedBulkInsertBenchmark extends BenchmarkBase {
public static final int BENCHMARK_ROUNDS = 3;
public static final int ROWS = 5000;

private static final String[] partitions = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("");
private int partitionIndex = 0;
private static final long TS = TimestampFormat.parseTimestampString("2015-01-01");
public static final String SINGLE_INSERT_SQL_STMT = "insert into motiondata (d, device_id, ts, ax) values (?,?,?,?)";


Expand Down Expand Up @@ -100,16 +109,29 @@ private SQLBulkRequest getBulkArgsRequest() {

private Object[] getRandomObject() {
return new Object[]{
partitions[(partitionIndex++) % partitions.length],
RandomStringUtils.randomAlphabetic(1),
RandomStringUtils.randomAlphabetic(1),
"2015-01-01",
TS + partitionIndex,
5.0};
}

@Test
@BenchmarkOptions(benchmarkRounds = BENCHMARK_ROUNDS, warmupRounds = 1)
public void testBulkInsertWithBulkArgs() throws Exception {
getClient(false).execute(SQLBulkAction.INSTANCE, getBulkArgsRequest()).actionGet();
long inserted = 0;
long errors = 0;

SQLBulkResponse bulkResponse = getClient(false).execute(SQLBulkAction.INSTANCE, getBulkArgsRequest()).actionGet();
for (SQLBulkResponse.Result result : bulkResponse.results()) {
assertThat(result.errorMessage(), is(nullValue()));
if (result.rowCount() < 0) {
errors++;
} else {
inserted += result.rowCount();
}
}
assertThat(errors, is(0L));
assertThat(inserted, is(5000L));
}

}

0 comments on commit 92c0fc7

Please sign in to comment.