From 187f9b6961eb6f54d4d9570a545b3a74cb4bee7e Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sun, 5 Mar 2017 14:57:45 +0000 Subject: [PATCH 1/4] Temporarily use only 2 relevant tests for branch builder --- .../benchmarks/streams/streams_simple_benchmark_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index c9f970e1fd81f..fd9ce76f78c57 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -35,7 +35,8 @@ def __init__(self, test_context): @cluster(num_nodes=9) - @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) + #@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) + @matrix(test=["consume", "processstream"], scale=[1, 2, 3]) def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark From 87f06a893e799e81a9bffce57c203d331d6cb8b6 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Sun, 5 Mar 2017 14:58:52 +0000 Subject: [PATCH 2/4] Undo previous --- .../benchmarks/streams/streams_simple_benchmark_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index fd9ce76f78c57..c9f970e1fd81f 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -35,8 +35,7 @@ def __init__(self, test_context): @cluster(num_nodes=9) - #@matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) - @matrix(test=["consume", "processstream"], scale=[1, 2, 3]) + @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark From 204536105b0960a712aab00b24bbb9c3135dcaa1 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Thu, 9 Mar 2017 19:35:24 +0000 Subject: [PATCH 3/4] Increase parallelism as recommended in tuning guide --- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 823ad47ee5463..45cc6f92c7d36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -124,6 +124,7 @@ public void openDB(ProcessorContext context) { options.setCreateIfMissing(true); options.setErrorIfExists(false); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); + options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); wOptions = new WriteOptions(); wOptions.setDisableWAL(true); From 3db0471c084a08c628aef50252a769adb9d9c50c Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Fri, 10 Mar 2017 10:31:14 +0000 Subject: [PATCH 4/4] Added comment --- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 45cc6f92c7d36..932ddd29fa0b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -124,6 +124,9 @@ public void openDB(ProcessorContext context) { options.setCreateIfMissing(true); options.setErrorIfExists(false); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); + // this is the recommended way to increase parallelism in RocksDb + // note that the current implementation increases the number of compaction threads + // but not flush threads. options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); wOptions = new WriteOptions();