Skip to content

Commit

Permalink
parallel broker merges on fork join pool (#8578)
Browse files Browse the repository at this point in the history
* sketch of broker parallel merges done in small batches on fork join pool

* fix non-terminating sequences, auto compute parallelism

* adjust benches

* adjust benchmarks

* now hella more faster, fixed dumb

* fix

* remove comments

* log.info for debug

* javadoc

* safer block for sequence to yielder conversion

* refactor LifecycleForkJoinPool into LifecycleForkJoinPoolProvider which wraps a ForkJoinPool

* smooth yield rate adjustment, more logs to help tune

* cleanup, less logs

* error handling, bug fixes, on by default, more parallel, more tests

* remove unused var

* comments

* timeboundary mergeFn

* simplify, more javadoc

* formatting

* pushdown config

* use nanos consistently, move logs back to debug level, bit more javadoc

* static terminal result batch

* javadoc for nullability of createMergeFn

* cleanup

* oops

* fix race, add docs

* spelling, remove todo, add unhandled exception log

* cleanup, revert unintended change

* another unintended change

* review stuff

* add ParallelMergeCombiningSequenceBenchmark, fixes

* hyper-threading is the enemy

* fix initial start delay, lol

* parallelism computer now balances partition sizes to partition counts using sqrt of sequence count instead of sequence count by 2

* fix those important style issues with the benchmarks code

* lazy sequence creation for benchmarks

* more benchmark comments

* stable sequence generation time

* update defaults to use 100ms target time, 4096 batch size, 16384 initial yield, also update user docs

* add jmh thread based benchmarks, cleanup some stuff

* oops

* style

* add spread to jmh thread benchmark start range, more comments to benchmarks parameters and purpose

* retool benchmark to allow modeling more typical heterogenous heavy workloads

* spelling

* fix

* refactor benchmarks

* formatting

* docs

* add maxThreadStartDelay parameter to threaded benchmark

* why does catch need to be on its own line but else doesnt
  • Loading branch information
clintropolis authored and jihoonson committed Nov 7, 2019
1 parent a9aa416 commit 7aafcf8
Show file tree
Hide file tree
Showing 27 changed files with 2,885 additions and 51 deletions.
6 changes: 6 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
<artifactId>druid-histogram</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.Row;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand All @@ -66,6 +65,7 @@
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
Expand All @@ -84,6 +84,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
Expand Down Expand Up @@ -134,35 +135,40 @@
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 15)
@Measurement(iterations = 30)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class CachingClusteredClientBenchmark
{
private static final Logger LOG = new Logger(CachingClusteredClientBenchmark.class);
private static final int PROCESSING_BUFFER_SIZE = 10 * 1024 * 1024; // ~10MB
private static final String DATA_SOURCE = "ds";

public static final ObjectMapper JSON_MAPPER;
@Param({"8"})
@Param({"8", "24"})
private int numServers;

@Param({"4", "2", "1"})
private int numProcessingThreads;

@Param({"0", "1", "4"})
private int parallelism;

@Param({"75000"})
private int rowsPerSegment;

@Param({"all"})
@Param({"all", "minute"})
private String queryGranularity;

private QueryToolChestWarehouse toolChestWarehouse;
private QueryRunnerFactoryConglomerate conglomerate;
private CachingClusteredClient cachingClusteredClient;
private ExecutorService processingPool;
private ForkJoinPool forkJoinPool;

private boolean parallelCombine;

private Query query;

Expand All @@ -173,6 +179,8 @@ public class CachingClusteredClientBenchmark
Collections.singletonList(basicSchema.getDataInterval())
);

private final int numProcessingThreads = 4;

static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
Expand All @@ -188,6 +196,8 @@ public void setup()
{
final String schemaName = "basic";

parallelCombine = parallelism > 0;

BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);

Map<DataSegment, QueryableIndex> queryableIndexes = new HashMap<>(numServers);
Expand Down Expand Up @@ -232,6 +242,12 @@ public int getNumThreads()
{
return numProcessingThreads;
}

@Override
public boolean useParallelMergePool()
{
return true;
}
};

conglomerate = new DefaultQueryRunnerFactoryConglomerate(
Expand Down Expand Up @@ -298,14 +314,22 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
}

processingPool = Execs.multiThreaded(processingConfig.getNumThreads(), "caching-clustered-client-benchmark");
forkJoinPool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true
);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
serverView,
MapCache.create(0),
JSON_MAPPER,
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig()
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
);
}

Expand Down Expand Up @@ -360,6 +384,7 @@ public void tearDown() throws IOException
{
closer.close();
processingPool.shutdown();
forkJoinPool.shutdownNow();
}

@Benchmark
Expand All @@ -372,6 +397,12 @@ public void timeseriesQuery(Blackhole blackhole)
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Result<TimeseriesResultValue>> results = runQuery();
Expand All @@ -389,11 +420,17 @@ public void topNQuery(Blackhole blackhole)
query = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.dimension(new DefaultDimensionSpec("dimUniform", null))
.dimension(new DefaultDimensionSpec("dimZipf", null))
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.metric("sumLongSequential")
.threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Result<TopNResultValue>> results = runQuery();
Expand All @@ -413,16 +450,22 @@ public void groupByQuery(Blackhole blackhole)
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(basicSchemaIntervalSpec)
.setDimensions(
new DefaultDimensionSpec("dimUniform", null),
new DefaultDimensionSpec("dimZipf", null)
new DefaultDimensionSpec("dimZipf", null),
new DefaultDimensionSpec("dimSequential", null)
)
.setAggregatorSpecs(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.setGranularity(Granularity.fromString(queryGranularity))
.setContext(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();

final List<Row> results = runQuery();
final List<ResultRow> results = runQuery();

for (Row result : results) {
for (ResultRow result : results) {
blackhole.consume(result);
}
}
Expand Down
Loading

0 comments on commit 7aafcf8

Please sign in to comment.