Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel broker merges on fork join pool #8578

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f72501a
sketch of broker parallel merges done in small batches on fork join pool
clintropolis Sep 3, 2019
7320b93
fix non-terminating sequences, auto compute parallelism
clintropolis Sep 7, 2019
fb62358
adjust benches
clintropolis Sep 12, 2019
1054d31
adjust benchmarks
clintropolis Sep 16, 2019
2764b9c
now hella more faster, fixed dumb
clintropolis Sep 21, 2019
2667e7f
fix
clintropolis Sep 21, 2019
94a7ef7
remove comments
clintropolis Sep 21, 2019
754deec
log.info for debug
clintropolis Sep 21, 2019
43f8338
javadoc
clintropolis Sep 24, 2019
7e25be7
safer block for sequence to yielder conversion
clintropolis Sep 25, 2019
761c680
refactor LifecycleForkJoinPool into LifecycleForkJoinPoolProvider whi…
clintropolis Sep 26, 2019
e8cc5a8
smooth yield rate adjustment, more logs to help tune
clintropolis Sep 27, 2019
bcc90a4
cleanup, less logs
clintropolis Sep 27, 2019
41f09f9
error handling, bug fixes, on by default, more parallel, more tests
clintropolis Oct 1, 2019
6c2e115
remove unused var
clintropolis Oct 1, 2019
e3cecdc
comments
clintropolis Oct 2, 2019
b01ddba
timeboundary mergeFn
clintropolis Oct 2, 2019
3b6f715
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 2, 2019
78f1da8
simplify, more javadoc
clintropolis Oct 2, 2019
8ed8494
formatting
clintropolis Oct 2, 2019
4301c0f
pushdown config
clintropolis Oct 7, 2019
1eca264
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 7, 2019
15d7229
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 10, 2019
5e54357
use nanos consistently, move logs back to debug level, bit more javadoc
clintropolis Oct 10, 2019
8c02cc5
static terminal result batch
clintropolis Oct 10, 2019
9336289
javadoc for nullability of createMergeFn
clintropolis Oct 10, 2019
23b7a37
cleanup
clintropolis Oct 10, 2019
ef80b53
oops
clintropolis Oct 10, 2019
7097f22
fix race, add docs
clintropolis Oct 11, 2019
bd0710c
spelling, remove todo, add unhandled exception log
clintropolis Oct 11, 2019
024d890
cleanup, revert unintended change
clintropolis Oct 11, 2019
9cbe4d0
another unintended change
clintropolis Oct 11, 2019
206566e
review stuff
clintropolis Oct 12, 2019
a179dca
add ParallelMergeCombiningSequenceBenchmark, fixes
clintropolis Oct 19, 2019
0f6cffc
hyper-threading is the enemy
clintropolis Oct 19, 2019
0e69cd2
fix initial start delay, lol
clintropolis Oct 19, 2019
c722c16
parallelism computer now balances partition sizes to partition counts…
clintropolis Oct 22, 2019
a9d4213
fix those important style issues with the benchmarks code
clintropolis Oct 22, 2019
d6137b1
lazy sequence creation for benchmarks
clintropolis Oct 23, 2019
f5cd469
more benchmark comments
clintropolis Oct 23, 2019
4bb3796
stable sequence generation time
clintropolis Oct 23, 2019
0b26c50
update defaults to use 100ms target time, 4096 batch size, 16384 init…
clintropolis Oct 23, 2019
eb3024b
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 23, 2019
231d67c
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 23, 2019
9aa3d0b
add jmh thread based benchmarks, cleanup some stuff
clintropolis Oct 28, 2019
eddda37
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 28, 2019
9fd16c2
oops
clintropolis Oct 28, 2019
7531859
style
clintropolis Oct 29, 2019
18b3449
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Oct 30, 2019
8697c2a
add spread to jmh thread benchmark start range, more comments to benc…
clintropolis Oct 30, 2019
f8127f3
retool benchmark to allow modeling more typical heterogenous heavy wo…
clintropolis Nov 1, 2019
0b1ba14
spelling
clintropolis Nov 1, 2019
e5e2940
fix
clintropolis Nov 1, 2019
6b3baa6
refactor benchmarks
clintropolis Nov 1, 2019
2f6b06d
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Nov 4, 2019
7b8ebe9
formatting
clintropolis Nov 4, 2019
dfc0560
docs
clintropolis Nov 4, 2019
9c9988e
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Nov 4, 2019
1b2a81c
add maxThreadStartDelay parameter to threaded benchmark
clintropolis Nov 4, 2019
51c1e3b
why does catch need to be on its own line but else doesnt
clintropolis Nov 5, 2019
668c858
Merge remote-tracking branch 'upstream/master' into broker-parallel-m…
clintropolis Nov 6, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
<artifactId>druid-histogram</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.Row;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand All @@ -66,6 +65,7 @@
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
Expand All @@ -84,6 +84,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
Expand Down Expand Up @@ -134,35 +135,40 @@
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 15)
@Measurement(iterations = 30)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class CachingClusteredClientBenchmark
{
private static final Logger LOG = new Logger(CachingClusteredClientBenchmark.class);
private static final int PROCESSING_BUFFER_SIZE = 10 * 1024 * 1024; // ~10MB
private static final String DATA_SOURCE = "ds";

public static final ObjectMapper JSON_MAPPER;
@Param({"8"})
@Param({"8", "24"})
private int numServers;

@Param({"4", "2", "1"})
private int numProcessingThreads;

@Param({"0", "1", "4"})
private int parallelism;

@Param({"75000"})
private int rowsPerSegment;

@Param({"all"})
@Param({"all", "minute"})
private String queryGranularity;

private QueryToolChestWarehouse toolChestWarehouse;
private QueryRunnerFactoryConglomerate conglomerate;
private CachingClusteredClient cachingClusteredClient;
private ExecutorService processingPool;
private ForkJoinPool forkJoinPool;

private boolean parallelCombine;

private Query query;

Expand All @@ -173,6 +179,8 @@ public class CachingClusteredClientBenchmark
Collections.singletonList(basicSchema.getDataInterval())
);

private final int numProcessingThreads = 4;

static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
Expand All @@ -188,6 +196,8 @@ public void setup()
{
final String schemaName = "basic";

parallelCombine = parallelism > 0;

BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);

Map<DataSegment, QueryableIndex> queryableIndexes = new HashMap<>(numServers);
Expand Down Expand Up @@ -231,6 +241,12 @@ public int getNumThreads()
{
return numProcessingThreads;
}

@Override
public boolean useParallelMergePool()
{
return true;
}
};

conglomerate = new DefaultQueryRunnerFactoryConglomerate(
Expand Down Expand Up @@ -297,14 +313,22 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
}

processingPool = Execs.multiThreaded(processingConfig.getNumThreads(), "caching-clustered-client-benchmark");
forkJoinPool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true
);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
serverView,
MapCache.create(0),
JSON_MAPPER,
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig()
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
);
}

Expand Down Expand Up @@ -359,6 +383,7 @@ public void tearDown() throws IOException
{
closer.close();
processingPool.shutdown();
forkJoinPool.shutdownNow();
}

@Benchmark
Expand All @@ -371,6 +396,12 @@ public void timeseriesQuery(Blackhole blackhole)
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Result<TimeseriesResultValue>> results = runQuery();
Expand All @@ -388,11 +419,17 @@ public void topNQuery(Blackhole blackhole)
query = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.dimension(new DefaultDimensionSpec("dimUniform", null))
.dimension(new DefaultDimensionSpec("dimZipf", null))
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.metric("sumLongSequential")
.threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Result<TopNResultValue>> results = runQuery();
Expand All @@ -412,16 +449,22 @@ public void groupByQuery(Blackhole blackhole)
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(basicSchemaIntervalSpec)
.setDimensions(
new DefaultDimensionSpec("dimUniform", null),
new DefaultDimensionSpec("dimZipf", null)
new DefaultDimensionSpec("dimZipf", null),
new DefaultDimensionSpec("dimSequential", null)
)
.setAggregatorSpecs(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.setGranularity(Granularity.fromString(queryGranularity))
.setContext(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Row> results = runQuery();
final List<ResultRow> results = runQuery();

for (Row result : results) {
for (ResultRow result : results) {
blackhole.consume(result);
}
}
Expand Down
Loading