diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 2c6e9ebda0f3..4c3502d53bd1 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -81,6 +81,12 @@ druid-histogram ${project.parent.version} + + org.apache.druid + druid-core + ${project.parent.version} + test-jar + org.apache.druid druid-processing diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index e8212cefa2cd..80aa49d0b732 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -47,7 +47,6 @@ import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.StupidPool; -import org.apache.druid.data.input.Row; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.concurrent.Execs; @@ -66,6 +65,7 @@ import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.Query; import org.apache.druid.query.QueryConfig; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -84,6 +84,7 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; @@ -134,12 +135,13 @@ import java.util.Map.Entry; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) @Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC") -@Warmup(iterations = 15) -@Measurement(iterations = 30) +@Warmup(iterations = 3) +@Measurement(iterations = 5) public class CachingClusteredClientBenchmark { private static final Logger LOG = new Logger(CachingClusteredClientBenchmark.class); @@ -147,22 +149,26 @@ public class CachingClusteredClientBenchmark private static final String DATA_SOURCE = "ds"; public static final ObjectMapper JSON_MAPPER; - @Param({"8"}) + @Param({"8", "24"}) private int numServers; - @Param({"4", "2", "1"}) - private int numProcessingThreads; + + @Param({"0", "1", "4"}) + private int parallelism; @Param({"75000"}) private int rowsPerSegment; - @Param({"all"}) + @Param({"all", "minute"}) private String queryGranularity; private QueryToolChestWarehouse toolChestWarehouse; private QueryRunnerFactoryConglomerate conglomerate; private CachingClusteredClient cachingClusteredClient; private ExecutorService processingPool; + private ForkJoinPool forkJoinPool; + + private boolean parallelCombine; private Query query; @@ -173,6 +179,8 @@ public class CachingClusteredClientBenchmark Collections.singletonList(basicSchema.getDataInterval()) ); + private final int numProcessingThreads = 4; + static { JSON_MAPPER = new DefaultObjectMapper(); JSON_MAPPER.setInjectableValues( @@ -188,6 +196,8 @@ public void setup() { final String schemaName = "basic"; + parallelCombine = parallelism > 0; + BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName); Map queryableIndexes = new HashMap<>(numServers); @@ -231,6 +241,12 @@ public int getNumThreads() { return numProcessingThreads; } + + @Override + public boolean useParallelMergePool() + { + return true; + } }; conglomerate = new DefaultQueryRunnerFactoryConglomerate( @@ -297,6 +313,12 @@ public > QueryToolChest getToolChest } processingPool = Execs.multiThreaded(processingConfig.getNumThreads(), "caching-clustered-client-benchmark"); + forkJoinPool = new ForkJoinPool( + (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, + true + ); cachingClusteredClient = new CachingClusteredClient( toolChestWarehouse, serverView, @@ -304,7 +326,9 @@ public > QueryToolChest getToolChest JSON_MAPPER, new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0), new CacheConfig(), - new DruidHttpClientConfig() + new DruidHttpClientConfig(), + processingConfig, + forkJoinPool ); } @@ -359,6 +383,7 @@ public void tearDown() throws IOException { closer.close(); processingPool.shutdown(); + forkJoinPool.shutdownNow(); } @Benchmark @@ -371,6 +396,12 @@ public void timeseriesQuery(Blackhole blackhole) .intervals(basicSchemaIntervalSpec) .aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")) .granularity(Granularity.fromString(queryGranularity)) + .context( + ImmutableMap.of( + QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, + QueryContexts.BROKER_PARALLELISM, parallelism + ) + ) .build(); final List> results = runQuery(); @@ -388,11 +419,17 @@ public void topNQuery(Blackhole blackhole) query = new TopNQueryBuilder() .dataSource(DATA_SOURCE) .intervals(basicSchemaIntervalSpec) - .dimension(new DefaultDimensionSpec("dimUniform", null)) + .dimension(new DefaultDimensionSpec("dimZipf", null)) .aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")) .granularity(Granularity.fromString(queryGranularity)) .metric("sumLongSequential") .threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results + .context( + ImmutableMap.of( + QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, + QueryContexts.BROKER_PARALLELISM, parallelism + ) + ) .build(); final List> results = runQuery(); @@ -412,16 +449,22 @@ public void groupByQuery(Blackhole blackhole) .setDataSource(DATA_SOURCE) .setQuerySegmentSpec(basicSchemaIntervalSpec) .setDimensions( - new DefaultDimensionSpec("dimUniform", null), - new DefaultDimensionSpec("dimZipf", null) + new DefaultDimensionSpec("dimZipf", null), + new DefaultDimensionSpec("dimSequential", null) ) .setAggregatorSpecs(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")) .setGranularity(Granularity.fromString(queryGranularity)) + .setContext( + ImmutableMap.of( + QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, + QueryContexts.BROKER_PARALLELISM, parallelism + ) + ) .build(); - final List results = runQuery(); + final List results = runQuery(); - for (Row result : results) { + for (ResultRow result : results) { blackhole.consume(result); } } diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java new file mode 100644 index 000000000000..14f240295752 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/BaseParallelMergeCombiningSequenceBenchmark.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.sequences; + +import org.apache.druid.common.guava.CombiningSequence; +import org.apache.druid.java.util.common.guava.MergeSequence; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.function.Supplier; + +@State(Scope.Benchmark) +public class BaseParallelMergeCombiningSequenceBenchmark +{ + private static final Logger log = new Logger(ParallelMergeCombiningSequenceBenchmark.class); + // default merge FJP size + static final ForkJoinPool MERGE_POOL = new ForkJoinPool( + (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), + true + ); + + // note: parameters are broken down one per line to allow easily commenting out lines to mix and match which + // benchmarks to run + // also note: don't really run this like it is unless you have days to spare + @Param({ + "8", + "16", + "32", + "64" + }) + int numSequences; + + /** + * Strategy encodes the type of sequence and configuration parameters for that sequence. + * + * Strategies of the form: 'parallelism-{parallelism}-{targetTime}ms-{batchSize}-{yieldAfter}' + * encode the parameters for a {@link ParallelMergeCombiningSequence}. + * + * A strategy of: 'combiningMergeSequence-same-thread' (or an unrecognized value) will use a + * {@link CombiningSequence} that wraps a {@link MergeSequence} + */ + @Param({ + "combiningMergeSequence-same-thread", + "parallelism-1-10ms-256-1024", + "parallelism-4-10ms-256-1024", + "parallelism-8-10ms-256-1024", + "parallelism-16-10ms-256-1024", + "parallelism-1-100ms-1024-4096", + "parallelism-4-100ms-1024-4096", + "parallelism-8-100ms-1024-4096", + "parallelism-16-100ms-1024-4096", + "parallelism-1-100ms-4096-16384", + "parallelism-4-100ms-4096-16384", + "parallelism-8-100ms-4096-16384", + "parallelism-16-100ms-4096-16384" + }) + String strategy; + + private int parallelism; + private int targetTaskTimeMillis; + private int batchSize; + private int yieldAfter; + + private Function>, Sequence> outputSequenceFactory; + + @Setup(Level.Trial) + public void setup() + { + setupOutputSequence(); + } + + void setupOutputSequence() + { + String[] strategySplit = strategy.split("-"); + if ("parallelism".equals(strategySplit[0])) { + // "parallelism-{parallelism}-{targetTime}ms-{batchSize}-{yieldAfter}" + parallelism = Integer.parseInt(strategySplit[1]); + targetTaskTimeMillis = Integer.parseInt(strategySplit[2].substring(0, strategySplit[2].length() - 2)); + batchSize = Integer.parseInt(strategySplit[3]); + yieldAfter = Integer.parseInt(strategySplit[4]); + outputSequenceFactory = this::createParallelSequence; + } else { + outputSequenceFactory = this::createCombiningMergeSequence; + } + } + + + Sequence createParallelSequence( + List> inputSequences + ) + { + return new ParallelMergeCombiningSequence<>( + MERGE_POOL, + inputSequences, + ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING, + ParallelMergeCombiningSequenceTest.INT_PAIR_MERGE_FN, + false, + 0, + 0, + parallelism, + yieldAfter, + batchSize, + targetTaskTimeMillis + ); + } + + Sequence createCombiningMergeSequence( + List> inputSequences + ) + { + return CombiningSequence.create( + new MergeSequence<>( + ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING, + Sequences.simple(inputSequences) + ), + ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING, + ParallelMergeCombiningSequenceTest.INT_PAIR_MERGE_FN + ); + } + + void consumeSequence(Blackhole blackhole, Supplier> supplier) + { + try { + Yielder yielder = + Yielders.each(outputSequenceFactory.apply(createInputSequences(supplier))); + + ParallelMergeCombiningSequenceTest.IntPair prev; + while (!yielder.isDone()) { + prev = yielder.get(); + blackhole.consume(prev); + yielder = yielder.next(yielder.get()); + } + } + catch (Exception anyException) { + log.error(anyException, "benchmark failed"); + throw new RuntimeException(anyException); + } + } + + List> createInputSequences( + Supplier> supplier + ) + { + List> inputSequences = new ArrayList<>(numSequences); + for (int j = 0; j < numSequences; j++) { + inputSequences.add(supplier.get()); + } + return inputSequences; + } + + Sequence generateSmallSequence() + { + return ParallelMergeCombiningSequenceTest.blockingSequence( + ThreadLocalRandom.current().nextInt(500, 10000), + 50, + 200, + -1, + 0, + true + ); + } + + Sequence generateModeratelyLargeSequence() + { + return ParallelMergeCombiningSequenceTest.blockingSequence( + ThreadLocalRandom.current().nextInt(50_000, 75_000), + 1000, + 2500, + -1, + 0, + true + ); + } + + Sequence generateLargeSequence() + { + final int numRows = ThreadLocalRandom.current().nextInt(1_500_000, 10_000_000); + final int frequency = numRows / 5; + return ParallelMergeCombiningSequenceTest.blockingSequence( + numRows, + 5000, + 10000, + frequency, + 10, + true + ); + } +} diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/MergeSequenceBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/MergeSequenceBenchmark.java similarity index 98% rename from benchmarks/src/main/java/org/apache/druid/benchmark/MergeSequenceBenchmark.java rename to benchmarks/src/main/java/org/apache/druid/benchmark/sequences/MergeSequenceBenchmark.java index 150bddcb8ef9..995163077e50 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/MergeSequenceBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/MergeSequenceBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.sequences; import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceBenchmark.java new file mode 100644 index 000000000000..be6b2a6291a6 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceBenchmark.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.sequences; + +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + + +@State(Scope.Benchmark) +@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC") +@Warmup(iterations = 5) +@Measurement(iterations = 25) +public class ParallelMergeCombiningSequenceBenchmark extends BaseParallelMergeCombiningSequenceBenchmark +{ + private static final Logger log = new Logger(ParallelMergeCombiningSequenceBenchmark.class); + // this should be as large as the largest value of concurrentSequenceConsumers + private static final ExecutorService CONSUMER_POOL = Execs.multiThreaded(64, "mock-http-thread"); + + /** + * Number of threads to run on {@link #CONSUMER_POOL}, each running {@link #consumeSequence} + */ + @Param({ + "1", + "2", + "4", + "8", + "16", + "32", + "64" + }) + private int concurrentSequenceConsumers; + + /** + * Offset to start each thread of {@link #concurrentSequenceConsumers} + */ + @Param({ + "0", + "10", + "100", + "500", + "1000" + }) + private int concurrentConsumerDelayMillis; + + /** + * This encodes the type of input sequences and parameters that control their behavior. + * 'non-blocking-sequence-{numRows}' uses {@link ParallelMergeCombiningSequenceTest#nonBlockingSequence} to as you + * might expect create an input sequence that is lazily generated and will not block while being consumed. + * + * 'initially-blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms' uses + * {@link ParallelMergeCombiningSequenceTest#blockingSequence} to create a lazily generated input sequence that will + * initially block for a random time within the range specified in the parameter, and will not perform any additional + * blocking during further processing. + * + * 'blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms-{numberOfTimesToBlock}-{frequencyDelay}ms' uses + * {@link ParallelMergeCombiningSequenceTest#blockingSequence} to create a lazily generated input sequence that will + * initially block for a random time within the range specified in the parameter, and additionally will randomly block + * up to the number of occurrences for up to the delay encoded in the parameter. + * + * 'typical-distribution-sequence' will randomly produce a 'class' of input sequences at the following rates: + * - 80% probability of a small result set which has a short initial delay on the order of tens to hundreds of millis + * and input row counts of up to a few thousand + * - 20% probability produce a moderately large result set which has an initial delay in the range of a few seconds + * and input sequence row counts in the 50k-75k range + * This input sequence is only useful when testing a large number of concurrent threads + * + * note: beware when using the blocking sequences for a direct comparison between strategies + * at minimum they are useful for gauging behavior when sequences block, but because the sequences are not stable + * between strategies or number of sequences, much less between iterations of the same strategy, compensation in + * the form of running a lot of iterations could potentially make them more directly comparable + */ + @Param({ + "non-blocking-sequence-1000", + "non-blocking-sequence-75000", + "non-blocking-sequence-10000000", + "initially-blocking-sequence-1000-100-500ms", + "initially-blocking-sequence-75000-100-500ms", + "initially-blocking-sequence-10000000-100-500ms", + "initially-blocking-sequence-1000-4000-5000ms", + "initially-blocking-sequence-75000-4000-5000ms", + "initially-blocking-sequence-10000000-4000-5000ms", + "blocking-sequence-1000-10-500ms-10-1ms", + "blocking-sequence-75000-10-500ms-10-1ms", + "blocking-sequence-10000000-10-500ms-10-1ms", + "typical-distribution-sequence" + }) + String inputSequenceType; + + private Supplier> homogeneousInputSequenceFactory; + private Function> heterogeneousInputSequenceFactory; + + @Setup(Level.Trial) + public void setupInputSequenceGenerator() + { + String[] inputSequenceTypeSplit = inputSequenceType.split("-"); + if ("initially".equals(inputSequenceTypeSplit[0])) { + // e.g. "initially-blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms" + final int numRows = Integer.parseInt(inputSequenceTypeSplit[3]); + final int startDelayStartMillis = Integer.parseInt(inputSequenceTypeSplit[4]); + final int startDelayEndMillis = Integer.parseInt( + inputSequenceTypeSplit[5].substring(0, inputSequenceTypeSplit[5].length() - 2) + ); + homogeneousInputSequenceFactory = () -> + ParallelMergeCombiningSequenceTest.blockingSequence( + numRows, + startDelayStartMillis, + startDelayEndMillis, + -1, + 0, + true + ); + } else if ("blocking".equals(inputSequenceTypeSplit[0])) { + // e.g. "blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms-{numberOfTimesToBlock}-{frequencyDelay}ms" + final int numRows = Integer.parseInt(inputSequenceTypeSplit[2]); + final int startDelayStartMillis = Integer.parseInt(inputSequenceTypeSplit[3]); + final int startDelayEndMillis = Integer.parseInt( + inputSequenceTypeSplit[4].substring(0, inputSequenceTypeSplit[4].length() - 2) + ); + final int numberOfTimesToBlock = Integer.parseInt(inputSequenceTypeSplit[5]); + final int maxIterationDelayMillis = Integer.parseInt( + inputSequenceTypeSplit[6].substring(0, inputSequenceTypeSplit[6].length() - 2) + ); + final int frequency = numRows / numberOfTimesToBlock; + homogeneousInputSequenceFactory = () -> + ParallelMergeCombiningSequenceTest.blockingSequence( + numRows, + startDelayStartMillis, + startDelayEndMillis, + frequency, + maxIterationDelayMillis, + true + ); + } else if ("non".equals(inputSequenceTypeSplit[0])) { + // e.g. "non-blocking-sequence-{numRows}" + final int numRows = Integer.parseInt(inputSequenceTypeSplit[3]); + homogeneousInputSequenceFactory = () -> + ParallelMergeCombiningSequenceTest.nonBlockingSequence(numRows, true); + } else { // "typical distribution" input sequence + // approximately 80% of threads will merge/combine small result sets between 500-10k results per input sequence + // blocking for 50-200 ms before initial results are yielded + // approximately 20% of threads will merge/combine moderate sized result sets between 50k-75k per input + // sequence, blocking for 1000-2500 ms before initial results are yielded + heterogeneousInputSequenceFactory = (d) -> { + if (d < 0.80) { // small queries + return generateSmallSequence(); + } else { // moderately large queries + return generateModeratelyLargeSequence(); + } + }; + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void exec(Blackhole blackhole) throws Exception + { + List futures = createConsumers(blackhole, concurrentSequenceConsumers, concurrentConsumerDelayMillis); + + for (int i = 0; i < concurrentSequenceConsumers; i++) { + blackhole.consume(futures.get(i).get()); + } + blackhole.consume(futures); + } + + private List createConsumers(Blackhole blackhole, int consumers, int delayMillis) throws Exception + { + List futures = new ArrayList<>(consumers); + for (int i = 0; i < consumers; i++) { + if (delayMillis > 0) { + Thread.sleep(delayMillis); + } + if (heterogeneousInputSequenceFactory != null) { + double d = ThreadLocalRandom.current().nextDouble(0.0, 1.0); + futures.add( + CONSUMER_POOL.submit(() -> consumeSequence(blackhole, () -> heterogeneousInputSequenceFactory.apply(d))) + ); + } else { + futures.add(CONSUMER_POOL.submit(() -> consumeSequence(blackhole, homogeneousInputSequenceFactory))); + } + } + + return futures; + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ParallelMergeCombiningSequenceBenchmark.class.getSimpleName()) + .forks(1) + .syncIterations(true) + .resultFormat(ResultFormatType.CSV) + .result("parallel-merge-combining-sequence.csv") + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceThreadedBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceThreadedBenchmark.java new file mode 100644 index 000000000000..1b932cae2c90 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/sequences/ParallelMergeCombiningSequenceThreadedBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.sequences; + +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest; +import org.apache.druid.java.util.common.guava.Sequence; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +@State(Scope.Benchmark) +@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC") +@Warmup(iterations = 5) +@Measurement(iterations = 25) +public class ParallelMergeCombiningSequenceThreadedBenchmark extends BaseParallelMergeCombiningSequenceBenchmark +{ + @Param({ + "0", + "100", + "500" + }) + int maxThreadStartDelay; + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Group("consumers") + @GroupThreads(4) + public void consumeSmall(Blackhole blackhole) + { + consumeSequence(blackhole, this::generateSmallSequence); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Group("consumers") + @GroupThreads(1) + public void consumeModeratelyLarge(Blackhole blackhole) + { + consumeSequence(blackhole, this::generateModeratelyLargeSequence); + } + + @Override + void consumeSequence(Blackhole blackhole, Supplier> supplier) + { + int delay = maxThreadStartDelay > 0 ? ThreadLocalRandom.current().nextInt(0, maxThreadStartDelay) : 0; + if (delay > 0) { + try { + Thread.sleep(delay); + } + catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + super.consumeSequence(blackhole, supplier); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/RE.java b/core/src/main/java/org/apache/druid/java/util/common/RE.java index f7a52a65c0b1..8a6fa4073330 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/RE.java +++ b/core/src/main/java/org/apache/druid/java/util/common/RE.java @@ -32,4 +32,9 @@ public RE(Throwable cause, String formatText, Object... arguments) { super(StringUtils.nonStrictFormat(formatText, arguments), cause); } + + public RE(Throwable cause) + { + super(cause == null ? null : cause.getMessage(), cause); + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java new file mode 100644 index 000000000000..41a82c219a1c --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -0,0 +1,1113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveAction; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BinaryOperator; + +/** + * Artisanal, locally-sourced, hand-crafted, gluten and GMO free, bespoke, free-range, organic, small-batch parallel + * merge combining sequence. + * + * See proposal: https://github.com/apache/incubator-druid/issues/8577 + * + * Functionally equivalent to wrapping {@link org.apache.druid.common.guava.CombiningSequence} around a + * {@link MergeSequence}, but done in parallel on a {@link ForkJoinPool} running in 'async' mode. + */ +public class ParallelMergeCombiningSequence extends YieldingSequenceBase +{ + private static final Logger LOG = new Logger(ParallelMergeCombiningSequence.class); + + // these values were chosen carefully via feedback from benchmarks, + // see PR https://github.com/apache/incubator-druid/pull/8578 for details + public static final int DEFAULT_TASK_TARGET_RUN_TIME_MILLIS = 100; + public static final int DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS = 16384; + public static final int DEFAULT_TASK_SMALL_BATCH_NUM_ROWS = 4096; + + private final ForkJoinPool workerPool; + private final List> baseSequences; + private final Ordering orderingFn; + private final BinaryOperator combineFn; + private final int queueSize; + private final boolean hasTimeout; + private final long timeoutAtNanos; + private final int queryPriority; // not currently used :( + private final int yieldAfter; + private final int batchSize; + private final int parallelism; + private final long targetTimeNanos; + private final CancellationGizmo cancellationGizmo; + + public ParallelMergeCombiningSequence( + ForkJoinPool workerPool, + List> baseSequences, + Ordering orderingFn, + BinaryOperator combineFn, + boolean hasTimeout, + long timeoutMillis, + int queryPriority, + int parallelism, + int yieldAfter, + int batchSize, + int targetTimeMillis + ) + { + this.workerPool = workerPool; + this.baseSequences = baseSequences; + this.orderingFn = orderingFn; + this.combineFn = combineFn; + this.hasTimeout = hasTimeout; + this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); + this.queryPriority = queryPriority; + this.parallelism = parallelism; + this.yieldAfter = yieldAfter; + this.batchSize = batchSize; + this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS); + this.queueSize = 4 * (yieldAfter / batchSize); + this.cancellationGizmo = new CancellationGizmo(); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + if (baseSequences.isEmpty()) { + return Sequences.empty().toYielder(initValue, accumulator); + } + + final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(queueSize); + MergeCombinePartitioningAction finalMergeAction = new MergeCombinePartitioningAction<>( + baseSequences, + orderingFn, + combineFn, + outputQueue, + queueSize, + parallelism, + yieldAfter, + batchSize, + targetTimeNanos, + hasTimeout, + timeoutAtNanos, + cancellationGizmo + ); + workerPool.execute(finalMergeAction); + Sequence finalOutSequence = makeOutputSequenceForQueue(outputQueue, hasTimeout, timeoutAtNanos, cancellationGizmo); + return finalOutSequence.toYielder(initValue, accumulator); + } + + /** + * Create an output {@link Sequence} that wraps the output {@link BlockingQueue} of a + * {@link MergeCombinePartitioningAction} + */ + static Sequence makeOutputSequenceForQueue( + BlockingQueue> queue, + boolean hasTimeout, + long timeoutAtNanos, + CancellationGizmo cancellationGizmo + ) + { + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + private ResultBatch currentBatch; + + @Override + public boolean hasNext() + { + final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); + if (hasTimeout && thisTimeoutNanos < 0) { + throw new RE(new TimeoutException("Sequence iterator timed out")); + } + + if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) { + return true; + } + try { + if (currentBatch == null || currentBatch.isDrained()) { + if (hasTimeout) { + currentBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS); + } else { + currentBatch = queue.take(); + } + } + if (currentBatch == null) { + throw new RE(new TimeoutException("Sequence iterator timed out waiting for data")); + } + + if (cancellationGizmo.isCancelled()) { + throw cancellationGizmo.getRuntimeException(); + } + + if (currentBatch.isTerminalResult()) { + return false; + } + return true; + } + catch (InterruptedException e) { + throw new RE(e); + } + } + + @Override + public T next() + { + if (cancellationGizmo.isCancelled()) { + throw cancellationGizmo.getRuntimeException(); + } + + if (currentBatch == null || currentBatch.isDrained() || currentBatch.isTerminalResult()) { + throw new NoSuchElementException(); + } + return currentBatch.next(); + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + } + + /** + * This {@link RecursiveAction} is the initial task of the parallel merge-combine process. Capacity and input sequence + * count permitting, it will partition the input set of {@link Sequence} to do 2 layer parallel merge. + * + * For the first layer, the partitions of input sequences are each wrapped in {@link YielderBatchedResultsCursor}, and + * for each partition a {@link PrepareMergeCombineInputsAction} will be executed to wait for each of the yielders to + * yield {@link ResultBatch}. After the cursors all have an initial set of results, the + * {@link PrepareMergeCombineInputsAction} will execute a {@link MergeCombineAction} + * to perform the actual work of merging sequences and combining results. The merged and combined output of each + * partition will itself be put into {@link ResultBatch} and pushed to a {@link BlockingQueue} with a + * {@link ForkJoinPool} {@link QueuePusher}. + * + * The second layer will execute a single {@link PrepareMergeCombineInputsAction} to wait for the {@link ResultBatch} + * from each partition to be available in their 'output' {@link BlockingQueue} which each is wrapped in + * {@link BlockingQueueuBatchedResultsCursor}. Like the first layer, after the {@link PrepareMergeCombineInputsAction} + * is complete and some {@link ResultBatch} are ready to merge from each partition, it will execute a + * {@link MergeCombineAction} do a final merge combine of all the parallel computed results, again pushing + * {@link ResultBatch} into a {@link BlockingQueue} with a {@link QueuePusher}. + */ + private static class MergeCombinePartitioningAction extends RecursiveAction + { + private final List> sequences; + private final Ordering orderingFn; + private final BinaryOperator combineFn; + private final BlockingQueue> out; + private final int queueSize; + private final int parallelism; + private final int yieldAfter; + private final int batchSize; + private final long targetTimeNanos; + private final boolean hasTimeout; + private final long timeoutAt; + private final CancellationGizmo cancellationGizmo; + + private MergeCombinePartitioningAction( + List> sequences, + Ordering orderingFn, + BinaryOperator combineFn, + BlockingQueue> out, + int queueSize, + int parallelism, + int yieldAfter, + int batchSize, + long targetTimeNanos, + boolean hasTimeout, + long timeoutAt, + CancellationGizmo cancellationGizmo + ) + { + this.sequences = sequences; + this.combineFn = combineFn; + this.orderingFn = orderingFn; + this.out = out; + this.queueSize = queueSize; + this.parallelism = parallelism; + this.yieldAfter = yieldAfter; + this.batchSize = batchSize; + this.targetTimeNanos = targetTimeNanos; + this.hasTimeout = hasTimeout; + this.timeoutAt = timeoutAt; + this.cancellationGizmo = cancellationGizmo; + } + + @Override + protected void compute() + { + try { + final int parallelTaskCount = computeNumTasks(); + + // if we have a small number of sequences to merge, or computed paralellism is too low, do not run in parallel, + // just serially perform the merge-combine with a single task + if (parallelTaskCount < 2) { + LOG.debug( + "Input sequence count (%s) or available parallel merge task count (%s) too small to perform parallel" + + " merge-combine, performing serially with a single merge-combine task", + sequences.size(), + parallelTaskCount + ); + + QueuePusher> resultsPusher = new QueuePusher<>(out, hasTimeout, timeoutAt); + + List> sequenceCursors = new ArrayList<>(sequences.size()); + for (Sequence s : sequences) { + sequenceCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn)); + } + PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction<>( + sequenceCursors, + resultsPusher, + orderingFn, + combineFn, + yieldAfter, + batchSize, + targetTimeNanos, + cancellationGizmo + ); + getPool().execute(blockForInputsAction); + } else { + // 2 layer parallel merge done in fjp + LOG.debug("Spawning %s parallel merge-combine tasks for %s sequences", parallelTaskCount, sequences.size()); + spawnParallelTasks(parallelTaskCount); + } + } + catch (Exception ex) { + cancellationGizmo.cancel(ex); + out.offer(ResultBatch.TERMINAL); + } + } + + private void spawnParallelTasks(int parallelMergeTasks) + { + List tasks = new ArrayList<>(); + List>> intermediaryOutputs = new ArrayList<>(parallelMergeTasks); + + List>> partitions = + Lists.partition(sequences, sequences.size() / parallelMergeTasks); + + for (List> partition : partitions) { + BlockingQueue> outputQueue = new ArrayBlockingQueue<>(queueSize); + intermediaryOutputs.add(outputQueue); + QueuePusher> pusher = new QueuePusher<>(outputQueue, hasTimeout, timeoutAt); + + List> partitionCursors = new ArrayList<>(sequences.size()); + for (Sequence s : partition) { + partitionCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn)); + } + PrepareMergeCombineInputsAction blockForInputsAction = new PrepareMergeCombineInputsAction<>( + partitionCursors, + pusher, + orderingFn, + combineFn, + yieldAfter, + batchSize, + targetTimeNanos, + cancellationGizmo + ); + tasks.add(blockForInputsAction); + } + + for (RecursiveAction task : tasks) { + getPool().execute(task); + } + + QueuePusher> outputPusher = new QueuePusher<>(out, hasTimeout, timeoutAt); + List> intermediaryOutputsCursors = new ArrayList<>(intermediaryOutputs.size()); + for (BlockingQueue> queue : intermediaryOutputs) { + intermediaryOutputsCursors.add( + new BlockingQueueuBatchedResultsCursor<>(queue, orderingFn, hasTimeout, timeoutAt) + ); + } + PrepareMergeCombineInputsAction finalMergeAction = new PrepareMergeCombineInputsAction<>( + intermediaryOutputsCursors, + outputPusher, + orderingFn, + combineFn, + yieldAfter, + batchSize, + targetTimeNanos, + cancellationGizmo + ); + + getPool().execute(finalMergeAction); + } + + /** + * Computes maximum number of layer 1 parallel merging tasks given available processors and an estimate of current + * {@link ForkJoinPool} utilization. A return value of 1 or less indicates that a serial merge will be done on + * the pool instead. + */ + private int computeNumTasks() + { + final int runningThreadCount = getPool().getRunningThreadCount(); + final int submissionCount = getPool().getQueuedSubmissionCount(); + + // max is smaller of either: + // - parallelism passed into sequence (number of physical cores by default) + // - pool parallelism (number of physical cores * 1.5 by default) + final int maxParallelism = Math.min(parallelism, getPool().getParallelism()); + + // we consider 'utilization' to be the number of running threads + submitted tasks that have not yet started + // running, minus 1 for the task that is running this calculation (as it will be replaced with the parallel tasks) + final int utilizationEstimate = runningThreadCount + submissionCount - 1; + + // 'computed parallelism' is the remaineder of the 'max parallelism' less current 'utilization estimate' + final int computedParallelismForUtilization = maxParallelism - utilizationEstimate; + + // try to balance partition size with partition count so we don't end up with layer 2 'final merge' task that has + // significantly more work to do than the layer 1 'parallel' tasks. + final int computedParallelismForSequences = (int) Math.floor(Math.sqrt(sequences.size())); + + // compute total number of layer 1 'parallel' tasks, for the utilzation parallelism, subtract 1 as the final merge + // task will take the remaining slot + final int computedOptimalParallelism = Math.min( + computedParallelismForSequences, + computedParallelismForUtilization - 1 + ); + + final int computedNumParallelTasks = Math.max(computedOptimalParallelism, 1); + + LOG.debug("Computed parallel tasks: [%s]; ForkJoinPool details - sequence parallelism: [%s] " + + "active threads: [%s] running threads: [%s] queued submissions: [%s] queued tasks: [%s] " + + "pool parallelism: [%s] pool size: [%s] steal count: [%s]", + computedNumParallelTasks, + parallelism, + getPool().getActiveThreadCount(), + runningThreadCount, + submissionCount, + getPool().getQueuedTaskCount(), + getPool().getParallelism(), + getPool().getPoolSize(), + getPool().getStealCount() + ); + + return computedNumParallelTasks; + } + } + + + /** + * This {@link RecursiveAction} is the work-horse of the {@link ParallelMergeCombiningSequence}, it merge-combines + * a set of {@link BatchedResultsCursor} and produces output to a {@link BlockingQueue} with the help of a + * {@link QueuePusher}. This is essentially a composite of logic taken from {@link MergeSequence} and + * {@link org.apache.druid.common.guava.CombiningSequence}, where the {@link Ordering} is used to both set the sort + * order for a {@link PriorityQueue}, and as a comparison to determine if 'same' ordered results need to be combined + * with a supplied {@link BinaryOperator} combining function. + * + * This task takes a {@link #yieldAfter} parameter which controls how many input result rows will be processed before + * this task completes and executes a new task to continue where it left off. This value is initially set by the + * {@link MergeCombinePartitioningAction} to a default value, but after that this process is timed to try and compute + * an 'optimal' number of rows to yield to achieve a task runtime of ~10ms, on the assumption that the time to process + * n results will be approximately the same. {@link #recursionDepth} is used to track how many times a task has + * continued executing, and utilized to compute a cumulative moving average of task run time per amount yielded in + * order to 'smooth' out the continual adjustment. + */ + private static class MergeCombineAction extends RecursiveAction + { + private final PriorityQueue> pQueue; + private final Ordering orderingFn; + private final BinaryOperator combineFn; + private final QueuePusher> outputQueue; + private final T initialValue; + private final int yieldAfter; + private final int batchSize; + private final long targetTimeNanos; + private final int recursionDepth; + private final CancellationGizmo cancellationGizmo; + + private MergeCombineAction( + PriorityQueue> pQueue, + QueuePusher> outputQueue, + Ordering orderingFn, + BinaryOperator combineFn, + T initialValue, + int yieldAfter, + int batchSize, + long targetTimeNanos, + int recursionDepth, + CancellationGizmo cancellationGizmo + ) + { + this.pQueue = pQueue; + this.orderingFn = orderingFn; + this.combineFn = combineFn; + this.outputQueue = outputQueue; + this.initialValue = initialValue; + this.yieldAfter = yieldAfter; + this.batchSize = batchSize; + this.targetTimeNanos = targetTimeNanos; + this.recursionDepth = recursionDepth; + this.cancellationGizmo = cancellationGizmo; + } + + @Override + protected void compute() + { + try { + long start = System.nanoTime(); + long startCpuNanos = JvmUtils.safeGetThreadCpuTime(); + + int counter = 0; + int batchCounter = 0; + ResultBatch outputBatch = new ResultBatch<>(batchSize); + + T currentCombinedValue = initialValue; + while (counter++ < yieldAfter && !pQueue.isEmpty()) { + BatchedResultsCursor cursor = pQueue.poll(); + + // push the queue along + if (!cursor.isDone()) { + T nextValueToAccumulate = cursor.get(); + + cursor.advance(); + if (!cursor.isDone()) { + pQueue.offer(cursor); + } else { + cursor.close(); + } + + // if current value is null, combine null with next value + if (currentCombinedValue == null) { + currentCombinedValue = combineFn.apply(null, nextValueToAccumulate); + continue; + } + + // if current value is "same" as next value, combine them + if (orderingFn.compare(currentCombinedValue, nextValueToAccumulate) == 0) { + currentCombinedValue = combineFn.apply(currentCombinedValue, nextValueToAccumulate); + continue; + } + + // else, push accumulated value to the queue, accumulate again with next value as initial + outputBatch.add(currentCombinedValue); + batchCounter++; + if (batchCounter >= batchSize) { + outputQueue.offer(outputBatch); + outputBatch = new ResultBatch<>(batchSize); + batchCounter = 0; + } + + // next value is now current value + currentCombinedValue = combineFn.apply(null, nextValueToAccumulate); + } else { + cursor.close(); + } + } + + if (!pQueue.isEmpty() && !cancellationGizmo.isCancelled()) { + // if there is still work to be done, execute a new task with the current accumulated value to continue + // combining where we left off + if (!outputBatch.isDrained()) { + outputQueue.offer(outputBatch); + } + + // measure the time it took to process 'yieldAfter' elements in order to project a next 'yieldAfter' value + // which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order + // to prevent normal jitter in processing time from skewing the next yield value too far in any direction + final long elapsedNanos = System.nanoTime() - start; + final long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos; + final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0); + final double cumulativeMovingAverage = + (nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1); + final int adjustedNextYieldAfter = (int) Math.ceil(cumulativeMovingAverage); + + LOG.debug( + "task recursion %s yielded %s results ran for %s millis (%s nanos), %s cpu nanos, next task yielding every %s operations", + recursionDepth, + yieldAfter, + TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS), + elapsedNanos, + elapsedCpuNanos, + adjustedNextYieldAfter + ); + getPool().execute(new MergeCombineAction<>( + pQueue, + outputQueue, + orderingFn, + combineFn, + currentCombinedValue, + adjustedNextYieldAfter, + batchSize, + targetTimeNanos, + recursionDepth + 1, + cancellationGizmo + )); + } else if (cancellationGizmo.isCancelled()) { + // if we got the cancellation signal, go ahead and write terminal value into output queue to help gracefully + // allow downstream stuff to stop + LOG.debug("cancelled after %s tasks", recursionDepth); + outputQueue.offer(ResultBatch.TERMINAL); + } else { + // if priority queue is empty, push the final accumulated value into the output batch and push it out + outputBatch.add(currentCombinedValue); + outputQueue.offer(outputBatch); + // ... and the terminal value to indicate the blocking queue holding the values is complete + outputQueue.offer(ResultBatch.TERMINAL); + LOG.debug("merge combine complete after %s tasks", recursionDepth); + } + } + catch (Exception ex) { + cancellationGizmo.cancel(ex); + outputQueue.offer(ResultBatch.TERMINAL); + } + } + } + + + /** + * This {@link RecursiveAction}, given a set of uninitialized {@link BatchedResultsCursor}, will initialize each of + * them (which is a potentially managed blocking operation) so that each will produce a {@link ResultBatch} + * from the {@link Yielder} or {@link BlockingQueue} that backs the cursor. + * + * Once initialized with a {@link ResultBatch}, the cursors are inserted into a {@link PriorityQueue} and + * fed into a {@link MergeCombineAction} which will do the actual work of merging and combining the result batches. + * This happens as soon as all cursors are initialized, as long as there is at least 1 cursor that is not 'done' + * ({@link BatchedResultsCursor#isDone()}). + * + * This task may take longer than other tasks on the {@link ForkJoinPool}, but is doing little actual work, the + * majority of its time will be spent managed blocking until results are ready for each cursor, or will be incredibly + * short lived if all inputs are already available. + */ + private static class PrepareMergeCombineInputsAction extends RecursiveAction + { + private final List> partition; + private final Ordering orderingFn; + private final BinaryOperator combineFn; + private final QueuePusher> outputQueue; + private final int yieldAfter; + private final int batchSize; + private final long targetTimeNanos; + private final CancellationGizmo cancellationGizmo; + + private PrepareMergeCombineInputsAction( + List> partition, + QueuePusher> outputQueue, + Ordering orderingFn, + BinaryOperator combineFn, + int yieldAfter, + int batchSize, + long targetTimeNanos, + CancellationGizmo cancellationGizmo + ) + { + this.partition = partition; + this.orderingFn = orderingFn; + this.combineFn = combineFn; + this.outputQueue = outputQueue; + this.yieldAfter = yieldAfter; + this.batchSize = batchSize; + this.targetTimeNanos = targetTimeNanos; + this.cancellationGizmo = cancellationGizmo; + } + + @Override + protected void compute() + { + try { + PriorityQueue> cursors = new PriorityQueue<>(partition.size()); + for (BatchedResultsCursor cursor : partition) { + // this is blocking + cursor.initialize(); + if (!cursor.isDone()) { + cursors.offer(cursor); + } + } + + if (cursors.size() > 0) { + getPool().execute(new MergeCombineAction( + cursors, + outputQueue, + orderingFn, + combineFn, + null, + yieldAfter, + batchSize, + targetTimeNanos, + 1, + cancellationGizmo + )); + } else { + outputQueue.offer(ResultBatch.TERMINAL); + } + } + catch (Exception ex) { + cancellationGizmo.cancel(ex); + outputQueue.offer(ResultBatch.TERMINAL); + } + } + } + + + /** + * {@link ForkJoinPool} friendly {@link BlockingQueue} feeder, adapted from 'QueueTaker' of Java documentation on + * {@link ForkJoinPool.ManagedBlocker}, + * https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.ManagedBlocker.html + */ + static class QueuePusher implements ForkJoinPool.ManagedBlocker + { + final boolean hasTimeout; + final long timeoutAtNanos; + final BlockingQueue queue; + volatile E item = null; + + QueuePusher(BlockingQueue q, boolean hasTimeout, long timeoutAtNanos) + { + this.queue = q; + this.hasTimeout = hasTimeout; + this.timeoutAtNanos = timeoutAtNanos; + } + + @Override + public boolean block() throws InterruptedException + { + boolean success = false; + if (item != null) { + if (hasTimeout) { + final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); + if (thisTimeoutNanos < 0) { + throw new RE(new TimeoutException("QueuePusher timed out offering data")); + } + success = queue.offer(item, thisTimeoutNanos, TimeUnit.NANOSECONDS); + } else { + success = queue.offer(item); + } + if (success) { + item = null; + } + } + return success; + } + + @Override + public boolean isReleasable() + { + return item == null; + } + + public void offer(E item) + { + try { + this.item = item; + ForkJoinPool.managedBlock(this); + } + catch (InterruptedException e) { + throw new RuntimeException("Failed to offer result to output queue", e); + } + } + } + + + /** + * Holder object for an ordered batch of results from a sequence. Batching the results vastly reduces the amount of + * blocking that is needed to move results between stages of {@link MergeCombineAction} done in parallel, allowing + * the fork join tasks to focus on doing actual work instead of dealing with managed blocking. + */ + static class ResultBatch + { + static final ResultBatch TERMINAL = new ResultBatch(); + + @Nullable + private final Queue values; + + ResultBatch(int batchSize) + { + this.values = new ArrayDeque<>(batchSize); + } + + private ResultBatch() + { + this.values = null; + } + + public void add(E in) + { + assert values != null; + values.offer(in); + } + + public E get() + { + assert values != null; + return values.peek(); + } + + public E next() + { + assert values != null; + return values.poll(); + } + + boolean isDrained() + { + return values != null && values.isEmpty(); + } + + boolean isTerminalResult() + { + return values == null; + } + + /** + * Convert sequence to yielder that accumulates results into ordered 'batches' + */ + static Yielder> fromSequence(Sequence sequence, int batchSize) + { + return sequence.toYielder( + new ResultBatch<>(batchSize), + new YieldingAccumulator, E>() + { + int count = 0; + @Override + public ResultBatch accumulate(ResultBatch accumulated, E in) + { + accumulated.add(in); + count++; + if (count % batchSize == 0) { + yield(); + } + return accumulated; + } + } + ); + } + } + + + /** + * {@link ForkJoinPool} friendly conversion of {@link Sequence} to {@link Yielder< ResultBatch >} + */ + static class SequenceBatcher implements ForkJoinPool.ManagedBlocker + { + private final Sequence sequence; + private final int batchSize; + private volatile Yielder> batchYielder; + + SequenceBatcher(Sequence sequence, int batchSize) + { + this.sequence = sequence; + this.batchSize = batchSize; + } + + Yielder> getBatchYielder() + { + try { + ForkJoinPool.managedBlock(this); + return batchYielder; + } + catch (InterruptedException e) { + throw new RuntimeException("Failed to load initial batch of results", e); + } + } + + @Override + public boolean block() + { + batchYielder = ResultBatch.fromSequence(sequence, batchSize); + return true; + } + + @Override + public boolean isReleasable() + { + return batchYielder != null; + } + } + + + /** + * Provides a higher level cursor interface to provide individual results out {@link ResultBatch} provided by + * a {@link Yielder} or {@link BlockingQueue}. This is the mechanism that powers {@link MergeCombineAction}, where + * a set of {@link BatchedResultsCursor} are placed in a {@link PriorityQueue} to facilitate ordering to merge results + * from these cursors, and combine results with the same ordering using the combining function. + */ + abstract static class BatchedResultsCursor + implements ForkJoinPool.ManagedBlocker, Comparable> + { + final Ordering ordering; + volatile ResultBatch resultBatch; + + BatchedResultsCursor(Ordering ordering) + { + this.ordering = ordering; + } + + public abstract void initialize(); + + public abstract void advance(); + + public abstract boolean isDone(); + + void nextBatch() + { + try { + ForkJoinPool.managedBlock(this); + } + catch (InterruptedException e) { + throw new RuntimeException("Failed to load next batch of results", e); + } + } + + public void close() + { + // nothing to close for blocking queue, but yielders will need to clean up or they will leak resources + } + + public E get() + { + return resultBatch.get(); + } + + @Override + public int compareTo(BatchedResultsCursor o) + { + return ordering.compare(get(), o.get()); + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof BatchedResultsCursor)) { + return false; + } + return compareTo((BatchedResultsCursor) o) == 0; + } + + @Override + public int hashCode() + { + return Objects.hash(ordering); + } + } + + + /** + * {@link BatchedResultsCursor} that wraps a {@link Yielder} of {@link ResultBatch} to provide individual rows + * of the result batch. + */ + static class YielderBatchedResultsCursor extends BatchedResultsCursor + { + final SequenceBatcher batcher; + Yielder> yielder; + + YielderBatchedResultsCursor(SequenceBatcher batcher, Ordering ordering) + { + super(ordering); + this.batcher = batcher; + } + + @Override + public void initialize() + { + yielder = batcher.getBatchYielder(); + resultBatch = yielder.get(); + } + + @Override + public void advance() + { + if (!resultBatch.isDrained()) { + resultBatch.next(); + } + if (resultBatch.isDrained() && !yielder.isDone()) { + nextBatch(); + } + } + + @Override + public boolean isDone() + { + // yielder will never produce a 'terminal' result batch, so only check that we drain the final batch when the + // yielder is done + return resultBatch == null || (yielder.isDone() && resultBatch.isDrained()); + } + + @Override + public boolean block() + { + if (yielder.isDone()) { + return true; + } + if (resultBatch == null || resultBatch.isDrained()) { + resultBatch = new ResultBatch<>(batcher.batchSize); + final Yielder> nextYielder = yielder.next(resultBatch); + yielder = nextYielder; + } + return true; + } + + @Override + public boolean isReleasable() + { + return resultBatch != null && !resultBatch.isDrained(); + } + + @Override + public void close() + { + try { + yielder.close(); + } + catch (IOException e) { + throw new RuntimeException("Failed to close yielder", e); + } + } + } + + + /** + * {@link BatchedResultsCursor} that wraps a {@link BlockingQueue} of {@link ResultBatch} to provide individual + * rows from the result batch. + */ + static class BlockingQueueuBatchedResultsCursor extends BatchedResultsCursor + { + final BlockingQueue> queue; + final boolean hasTimeout; + final long timeoutAtNanos; + + BlockingQueueuBatchedResultsCursor( + BlockingQueue> blockingQueue, + Ordering ordering, + boolean hasTimeout, + long timeoutAtNanos + ) + { + super(ordering); + this.queue = blockingQueue; + this.hasTimeout = hasTimeout; + this.timeoutAtNanos = timeoutAtNanos; + } + + @Override + public void initialize() + { + if (queue.isEmpty()) { + nextBatch(); + } else { + resultBatch = queue.poll(); + } + } + + @Override + public void advance() + { + if (!resultBatch.isDrained()) { + resultBatch.next(); + } + if (resultBatch.isDrained()) { + nextBatch(); + } + } + + @Override + public boolean isDone() + { + // blocking queue cursors always will finish the queue with a 'terminal' result batch to indicate that the queue + // is finished and no additional values are expected. + return resultBatch.isTerminalResult(); + } + + @Override + public boolean block() throws InterruptedException + { + if (resultBatch == null || resultBatch.isDrained()) { + if (hasTimeout) { + final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); + if (thisTimeoutNanos < 0) { + throw new RE(new TimeoutException("BlockingQueue cursor timed out waiting for data")); + } + resultBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS); + } else { + resultBatch = queue.take(); + } + } + return resultBatch != null; + } + + @Override + public boolean isReleasable() + { + // if result batch is 'terminal' or still has values, no need to block + if (resultBatch != null && (resultBatch.isTerminalResult() || !resultBatch.isDrained())) { + return true; + } + // if we can get a result immediately without blocking, also no need to block + resultBatch = queue.poll(); + return resultBatch != null; + } + } + + + /** + * Token to allow any {@link RecursiveAction} signal the others and the output sequence that something bad happened + * and processing should cancel, such as a timeout or connection loss. + */ + static class CancellationGizmo + { + private final AtomicReference exception = new AtomicReference<>(null); + + void cancel(Exception ex) + { + exception.compareAndSet(null, ex); + } + + boolean isCancelled() + { + return exception.get() != null; + } + + RuntimeException getRuntimeException() + { + Exception ex = exception.get(); + if (ex instanceof RuntimeException) { + return (RuntimeException) ex; + } + return new RE(ex); + } + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java new file mode 100644 index 000000000000..156447505c4b --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java @@ -0,0 +1,800 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import com.google.common.collect.Ordering; +import org.apache.druid.common.guava.CombiningSequence; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BinaryOperator; + +public class ParallelMergeCombiningSequenceTest +{ + private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class); + + public static final Ordering INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs); + public static final BinaryOperator INT_PAIR_MERGE_FN = (lhs, rhs) -> { + if (lhs == null) { + return rhs; + } + + if (rhs == null) { + return lhs; + } + + return new IntPair(lhs.lhs, lhs.rhs + rhs.rhs); + }; + + private ForkJoinPool pool; + + @Before + public void setup() + { + pool = new ForkJoinPool( + (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t), + true + ); + } + + @After + public void teardown() + { + pool.shutdown(); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testOrderedResultBatchFromSequence() throws IOException + { + Sequence rawSequence = nonBlockingSequence(5000); + ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = + new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>( + new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128), + INT_PAIR_ORDERING + ); + cursor.initialize(); + Yielder rawYielder = Yielders.each(rawSequence); + + IntPair prev = null; + while (!rawYielder.isDone() && !cursor.isDone()) { + Assert.assertEquals(rawYielder.get(), cursor.get()); + Assert.assertNotEquals(cursor.get(), prev); + prev = cursor.get(); + rawYielder = rawYielder.next(rawYielder.get()); + cursor.advance(); + } + cursor.close(); + rawYielder.close(); + } + + @Test + public void testOrderedResultBatchFromSequenceBackToYielderOnSequence() throws IOException + { + final int batchSize = 128; + final int sequenceSize = 5_000; + Sequence rawSequence = nonBlockingSequence(sequenceSize); + ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = + new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>( + new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128), + INT_PAIR_ORDERING + ); + + cursor.initialize(); + Yielder rawYielder = Yielders.each(rawSequence); + + ArrayBlockingQueue> outputQueue = + new ArrayBlockingQueue<>((int) Math.ceil(((double) sequenceSize / batchSize) + 2)); + + IntPair prev = null; + ParallelMergeCombiningSequence.ResultBatch currentBatch = + new ParallelMergeCombiningSequence.ResultBatch<>(batchSize); + int batchCounter = 0; + while (!rawYielder.isDone() && !cursor.isDone()) { + Assert.assertEquals(rawYielder.get(), cursor.get()); + Assert.assertNotEquals(cursor.get(), prev); + prev = cursor.get(); + currentBatch.add(prev); + batchCounter++; + if (batchCounter >= batchSize) { + outputQueue.offer(currentBatch); + currentBatch = new ParallelMergeCombiningSequence.ResultBatch<>(batchSize); + batchCounter = 0; + } + rawYielder = rawYielder.next(rawYielder.get()); + cursor.advance(); + } + if (!currentBatch.isDrained()) { + outputQueue.offer(currentBatch); + } + outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL); + + rawYielder.close(); + cursor.close(); + + rawYielder = Yielders.each(rawSequence); + + Sequence queueAsSequence = ParallelMergeCombiningSequence.makeOutputSequenceForQueue( + outputQueue, + true, + System.nanoTime() + TimeUnit.NANOSECONDS.convert(10_000, TimeUnit.MILLISECONDS), + new ParallelMergeCombiningSequence.CancellationGizmo() + ); + + Yielder queueYielder = Yielders.each(queueAsSequence); + + while (!rawYielder.isDone() && !queueYielder.isDone()) { + Assert.assertEquals(rawYielder.get(), queueYielder.get()); + Assert.assertNotEquals(queueYielder.get(), prev); + prev = queueYielder.get(); + rawYielder = rawYielder.next(rawYielder.get()); + queueYielder = queueYielder.next(queueYielder.get()); + } + + rawYielder.close(); + queueYielder.close(); + } + + @Test + public void testOrderedResultBatchFromSequenceToBlockingQueueCursor() throws IOException + { + final int batchSize = 128; + final int sequenceSize = 5_000; + Sequence rawSequence = nonBlockingSequence(sequenceSize); + ParallelMergeCombiningSequence.YielderBatchedResultsCursor cursor = + new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>( + new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128), + INT_PAIR_ORDERING + ); + + cursor.initialize(); + + Yielder rawYielder = Yielders.each(rawSequence); + + ArrayBlockingQueue> outputQueue = + new ArrayBlockingQueue<>((int) Math.ceil(((double) sequenceSize / batchSize) + 2)); + + IntPair prev = null; + ParallelMergeCombiningSequence.ResultBatch currentBatch = + new ParallelMergeCombiningSequence.ResultBatch<>(batchSize); + int batchCounter = 0; + while (!rawYielder.isDone() && !cursor.isDone()) { + Assert.assertEquals(rawYielder.get(), cursor.get()); + Assert.assertNotEquals(cursor.get(), prev); + prev = cursor.get(); + currentBatch.add(prev); + batchCounter++; + if (batchCounter >= batchSize) { + outputQueue.offer(currentBatch); + currentBatch = new ParallelMergeCombiningSequence.ResultBatch<>(batchSize); + batchCounter = 0; + } + rawYielder = rawYielder.next(rawYielder.get()); + cursor.advance(); + } + if (!currentBatch.isDrained()) { + outputQueue.offer(currentBatch); + } + outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL); + + rawYielder.close(); + cursor.close(); + + rawYielder = Yielders.each(rawSequence); + + ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor queueCursor = + new ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor<>( + outputQueue, + INT_PAIR_ORDERING, + false, + -1L + ); + queueCursor.initialize(); + prev = null; + while (!rawYielder.isDone() && !queueCursor.isDone()) { + Assert.assertEquals(rawYielder.get(), queueCursor.get()); + Assert.assertNotEquals(queueCursor.get(), prev); + prev = queueCursor.get(); + rawYielder = rawYielder.next(rawYielder.get()); + queueCursor.advance(); + } + rawYielder.close(); + queueCursor.close(); + } + + @Test + public void testNone() throws Exception + { + List> input = new ArrayList<>(); + assertResult(input); + } + + @Test + public void testEmpties() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(Sequences.empty()); + input.add(Sequences.empty()); + assertResult(input); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(Sequences.empty()); + input.add(Sequences.empty()); + input.add(Sequences.empty()); + assertResult(input); + } + + @Test + public void testEmptiesAndNonEmpty() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(Sequences.empty()); + input.add(nonBlockingSequence(5)); + assertResult(input); + + input.clear(); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(Sequences.empty()); + input.add(Sequences.empty()); + input.add(Sequences.empty()); + input.add(Sequences.empty()); + input.add(Sequences.empty()); + input.add(nonBlockingSequence(5)); + assertResult(input); + } + + @Test + public void testAllInSingleBatch() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(6)); + assertResult(input, 10, 20); + + input.clear(); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(6)); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(8)); + input.add(nonBlockingSequence(4)); + input.add(nonBlockingSequence(6)); + assertResult(input, 10, 20); + } + + @Test + public void testAllInSingleYield() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(6)); + assertResult(input, 4, 20); + + input.clear(); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(6)); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(8)); + input.add(nonBlockingSequence(4)); + input.add(nonBlockingSequence(6)); + assertResult(input, 4, 20); + } + + + @Test + public void testMultiBatchMultiYield() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(15)); + input.add(nonBlockingSequence(26)); + + assertResult(input, 5, 10); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(nonBlockingSequence(15)); + input.add(nonBlockingSequence(33)); + input.add(nonBlockingSequence(17)); + input.add(nonBlockingSequence(14)); + + assertResult(input, 5, 10); + } + + @Test + public void testMixedSingleAndMultiYield() throws Exception + { + // below min threshold, so will merge serially + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(60)); + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(8)); + + assertResult(input, 5, 10); + + // above min sequence count threshold, so will merge in parallel (if enough cores) + input.add(nonBlockingSequence(1)); + input.add(nonBlockingSequence(8)); + input.add(nonBlockingSequence(32)); + + assertResult(input, 5, 10); + } + + @Test + public void testLongerSequencesJustForFun() throws Exception + { + + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(10_000)); + input.add(nonBlockingSequence(9_001)); + + assertResult(input, 128, 1024); + + input.add(nonBlockingSequence(7_777)); + input.add(nonBlockingSequence(8_500)); + input.add(nonBlockingSequence(5_000)); + input.add(nonBlockingSequence(8_888)); + + assertResult(input, 128, 1024); + } + + @Test + public void testExceptionOnInputSequenceRead() throws Exception + { + List> input = new ArrayList<>(); + + input.add(explodingSequence(15)); + input.add(nonBlockingSequence(25)); + + + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "exploded" + ); + assertException(input); + + input.add(nonBlockingSequence(5)); + input.add(nonBlockingSequence(25)); + input.add(explodingSequence(11)); + input.add(nonBlockingSequence(12)); + + assertException(input); + } + + @Test + public void testExceptionFirstResultFromSequence() throws Exception + { + List> input = new ArrayList<>(); + input.add(explodingSequence(0)); + input.add(nonBlockingSequence(2)); + input.add(nonBlockingSequence(2)); + input.add(nonBlockingSequence(2)); + + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "exploded" + ); + assertException(input); + } + + @Test + public void testExceptionFirstResultFromMultipleSequence() throws Exception + { + List> input = new ArrayList<>(); + input.add(explodingSequence(0)); + input.add(explodingSequence(0)); + input.add(explodingSequence(0)); + input.add(nonBlockingSequence(2)); + input.add(nonBlockingSequence(2)); + input.add(nonBlockingSequence(2)); + + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "exploded" + ); + assertException(input); + } + + @Test + public void testTimeoutExceptionDueToStalledInput() throws Exception + { + final int someSize = 2048; + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(someSize)); + input.add(nonBlockingSequence(someSize)); + input.add(nonBlockingSequence(someSize)); + input.add(blockingSequence(someSize, 400, 500, 1, 500, true)); + expectedException.expect(RuntimeException.class); + expectedException.expectCause(Matchers.instanceOf(TimeoutException.class)); + expectedException.expectMessage("Sequence iterator timed out waiting for data"); + + assertException( + input, + ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, + ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS, + 1000L, + 0 + ); + } + + @Test + public void testTimeoutExceptionDueToStalledReader() throws Exception + { + final int someSize = 2048; + List> input = new ArrayList<>(); + input.add(nonBlockingSequence(someSize)); + input.add(nonBlockingSequence(someSize)); + input.add(nonBlockingSequence(someSize)); + input.add(nonBlockingSequence(someSize)); + + expectedException.expect(RuntimeException.class); + expectedException.expectCause(Matchers.instanceOf(TimeoutException.class)); + expectedException.expectMessage("Sequence iterator timed out"); + assertException(input, 8, 64, 1000, 500); + } + + private void assertResult(List> sequences) throws InterruptedException, IOException + { + assertResult( + sequences, + ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, + ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS + ); + } + + private void assertResult(List> sequences, int batchSize, int yieldAfter) + throws InterruptedException, IOException + { + final CombiningSequence combiningSequence = CombiningSequence.create( + new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)), + INT_PAIR_ORDERING, + INT_PAIR_MERGE_FN + ); + + final ParallelMergeCombiningSequence parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>( + pool, + sequences, + INT_PAIR_ORDERING, + INT_PAIR_MERGE_FN, + true, + 5000, + 0, + (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5), + yieldAfter, + batchSize, + ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS + ); + + Yielder combiningYielder = Yielders.each(combiningSequence); + Yielder parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence); + + IntPair prev = null; + + while (!combiningYielder.isDone() && !parallelMergeCombineYielder.isDone()) { + Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get()); + Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev); + prev = parallelMergeCombineYielder.get(); + combiningYielder = combiningYielder.next(combiningYielder.get()); + parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get()); + } + + Assert.assertTrue(combiningYielder.isDone()); + Assert.assertTrue(parallelMergeCombineYielder.isDone()); + while (pool.getRunningThreadCount() > 0) { + Thread.sleep(100); + } + Assert.assertEquals(0, pool.getRunningThreadCount()); + combiningYielder.close(); + parallelMergeCombineYielder.close(); + } + + private void assertException(List> sequences) throws Exception + { + assertException( + sequences, + ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, + ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS, + 5000L, + 0 + ); + } + + private void assertException( + List> sequences, + int batchSize, + int yieldAfter, + long timeout, + int readDelayMillis + ) + throws Exception + { + try { + final ParallelMergeCombiningSequence parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>( + pool, + sequences, + INT_PAIR_ORDERING, + INT_PAIR_MERGE_FN, + true, + timeout, + 0, + (int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5), + yieldAfter, + batchSize, + ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS + ); + + Yielder parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence); + + IntPair prev = null; + + while (!parallelMergeCombineYielder.isDone()) { + Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev); + prev = parallelMergeCombineYielder.get(); + if (readDelayMillis > 0 && ThreadLocalRandom.current().nextBoolean()) { + Thread.sleep(readDelayMillis); + } + parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get()); + } + parallelMergeCombineYielder.close(); + } + catch (Exception ex) { + LOG.warn(ex, "exception:"); + throw ex; + } + } + + public static class IntPair extends Pair + { + private IntPair(Integer lhs, Integer rhs) + { + super(lhs, rhs); + } + } + + /** + * Generate an ordered, random valued, non-blocking sequence of {@link IntPair}, optionally lazy generated with + * the implication that every time a sequence is accumulated or yielded it produces different results, + * which sort of breaks the {@link Sequence} contract, and makes this method useless for tests in lazy mode, + * however it is useful for benchmarking, where having a sequence without having to materialize the entire thing + * up front on heap with a {@link List} backing is preferable. + */ + public static Sequence nonBlockingSequence(int size, boolean lazyGenerate) + { + List pairs = lazyGenerate ? null : generateOrderedPairs(size); + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + int mergeKey = 0; + int rowCounter = 0; + @Override + public boolean hasNext() + { + return rowCounter < size; + } + + @Override + public IntPair next() + { + if (lazyGenerate) { + rowCounter++; + mergeKey += incrementMergeKeyAmount(); + return makeIntPair(mergeKey); + } else { + return pairs.get(rowCounter++); + } + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + } + + /** + * Generate an ordered, random valued, blocking sequence of {@link IntPair}, optionally lazy generated. See + * {@link ParallelMergeCombiningSequenceTest#nonBlockingSequence(int)} for the implications of lazy generating a + * sequence, to summarize each time the sequence is accumulated or yielded it produces different results. + * + * This sequence simulates blocking using {@link Thread#sleep(long)}, with an initial millisecond delay range defined + * by {@param startDelayStartMillis} and {@param startDelayEndMillis} that defines how long to block before the first + * sequence value will be produced, and {@param maxIterationDelayMillis} that defines how long to block every + * {@param iterationDelayFrequency} rows. + */ + public static Sequence blockingSequence( + int size, + int startDelayStartMillis, + int startDelayEndMillis, + int iterationDelayFrequency, + int maxIterationDelayMillis, + boolean lazyGenerate + ) + { + final List pairs = lazyGenerate ? null : generateOrderedPairs(size); + final long startDelayMillis = ThreadLocalRandom.current().nextLong(startDelayStartMillis, startDelayEndMillis); + final long delayUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(startDelayMillis, TimeUnit.MILLISECONDS); + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + int mergeKey = 0; + int rowCounter = 0; + @Override + public boolean hasNext() + { + return rowCounter < size; + } + + @Override + public IntPair next() + { + try { + final long currentNano = System.nanoTime(); + if (rowCounter == 0 && currentNano < delayUntil) { + final long sleepMillis = Math.max( + TimeUnit.MILLISECONDS.convert(delayUntil - currentNano, TimeUnit.NANOSECONDS), + 1 + ); + Thread.sleep(sleepMillis); + } else if (maxIterationDelayMillis > 0 + && rowCounter % iterationDelayFrequency == 0 + && ThreadLocalRandom.current().nextBoolean()) { + final int delayMillis = Math.max(ThreadLocalRandom.current().nextInt(maxIterationDelayMillis), 1); + Thread.sleep(delayMillis); + } + } + catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + if (lazyGenerate) { + rowCounter++; + mergeKey += incrementMergeKeyAmount(); + return makeIntPair(mergeKey); + } else { + return pairs.get(rowCounter++); + } + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + } + + /** + * Genenerate non-blocking sequence for tests, non-lazy so the sequence produces consistent results + */ + private static Sequence nonBlockingSequence(int size) + { + return nonBlockingSequence(size, false); + } + + /** + * Genenerate a sequence that explodes after {@param explodeAfter} rows + */ + private static Sequence explodingSequence(int explodeAfter) + { + final int explodeAt = explodeAfter + 1; + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + int mergeKey = 0; + int rowCounter = 0; + @Override + public boolean hasNext() + { + return rowCounter < explodeAt; + } + + @Override + public IntPair next() + { + if (rowCounter == explodeAfter) { + throw new RuntimeException("exploded"); + } + mergeKey += incrementMergeKeyAmount(); + rowCounter++; + return makeIntPair(mergeKey); + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + } + + private static List generateOrderedPairs(int length) + { + int rowCounter = 0; + int mergeKey = 0; + List generatedSequence = new ArrayList<>(length); + while (rowCounter < length) { + mergeKey += incrementMergeKeyAmount(); + generatedSequence.add(makeIntPair(mergeKey)); + rowCounter++; + } + return generatedSequence; + } + + private static int incrementMergeKeyAmount() + { + return ThreadLocalRandom.current().nextInt(1, 3); + } + + private static IntPair makeIntPair(int mergeKey) + { + return new IntPair(mergeKey, ThreadLocalRandom.current().nextInt(1, 100)); + } +} diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f176f9b48b05..907df0073dc3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1493,6 +1493,13 @@ The broker uses processing configs for nested groupBy queries. And, if you use g |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| +|`druid.processing.merge.useParallelMergePool`|Enable automatic parallel merging for Brokers on a dedicated async ForkJoinPool. If `false`, instead merges will be done serially on the `HTTP` thread pool.|`true`| +|`druid.processing.merge.pool.parallelism`|Size of ForkJoinPool. Note that the default configuration assumes that the value returned by `Runtime.getRuntime().availableProcessors()` represents 2 hyper-threads per physical core, and multiplies this value by `0.75` in attempt to size `1.5` times the number of _physical_ cores.|`Runtime.getRuntime().availableProcessors() * 0.75` (rounded up)| +|`druid.processing.merge.pool.defaultMaxQueryParallelism`|Default maximum number of parallel merge tasks per query. Note that the default configuration assumes that the value returned by `Runtime.getRuntime().availableProcessors()` represents 2 hyper-threads per physical core, and multiplies this value by `0.5` in attempt to size to the number of _physical_ cores.|`Runtime.getRuntime().availableProcessors() * 0.5` (rounded up)| +|`druid.processing.merge.pool.awaitShutdownMillis`|Time to wait for merge ForkJoinPool tasks to complete before ungracefully stopping on process shutdown in milliseconds.|`60_000`| +|`druid.processing.merge.task.targetRunTimeMillis`|Ideal run-time of each ForkJoinPool merge task, before forking off a new task to continue merging sequences.|`100`| +|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task, before forking off a new task to continue merging sequences.|`16384`| +|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks.|`4096`| The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 178bd8fc46e3..573aec5a6716 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -41,6 +41,11 @@ The query context is used for various query configuration parameters. The follow |maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| |serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process| |serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process| +|enableParallelMerge|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.html#broker) for more details.| +|parallelMergeParallelism|`druid.processing.merge.pool.parallelism`|Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.| +|parallelMergeInitialYieldRows|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.html#broker) for more details.| +|parallelMergeSmallBatchRows|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.| + In addition, some query types offer context parameters specific to that query type. diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChest.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChest.java index 6ec31083ba40..ca02de7156df 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChest.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChest.java @@ -32,6 +32,9 @@ import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; +import java.util.Comparator; +import java.util.function.BinaryOperator; + public class MaterializedViewQueryQueryToolChest extends QueryToolChest { private final QueryToolChestWarehouse warehouse; @@ -58,6 +61,20 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) }; } + @Override + public BinaryOperator createMergeFn(Query query) + { + final Query realQuery = getRealQuery(query); + return warehouse.getToolChest(realQuery).createMergeFn(realQuery); + } + + @Override + public Comparator createResultComparator(Query query) + { + final Query realQuery = getRealQuery(query); + return warehouse.getToolChest(realQuery).createResultComparator(realQuery); + } + @Override public QueryMetrics makeMetrics(Query query) { diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 762f975b36a1..f9bb0f273158 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -51,6 +51,7 @@ import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -84,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; /** * Base class for implementing MovingAverageQuery tests @@ -349,7 +351,16 @@ public long getMaxQueuedBytes() { return 0L; } - } + }, + new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + }, + ForkJoinPool.commonPool() ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/processing/src/main/java/org/apache/druid/guice/LifecycleForkJoinPoolProvider.java b/processing/src/main/java/org/apache/druid/guice/LifecycleForkJoinPoolProvider.java new file mode 100644 index 000000000000..afae29d6bcb4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/guice/LifecycleForkJoinPoolProvider.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + + +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +public class LifecycleForkJoinPoolProvider +{ + private static final Logger LOG = new Logger(LifecycleForkJoinPoolProvider.class); + private final long awaitShutdownMillis; + private final ForkJoinPool pool; + + public LifecycleForkJoinPoolProvider( + int parallelism, + ForkJoinPool.ForkJoinWorkerThreadFactory factory, + Thread.UncaughtExceptionHandler handler, + boolean asyncMode, + long awaitShutdownMillis + ) + { + this.pool = new ForkJoinPool(parallelism, factory, handler, asyncMode); + this.awaitShutdownMillis = awaitShutdownMillis; + } + + @LifecycleStop + public void stop() + { + LOG.info("Shutting down ForkJoinPool [%s]", this); + pool.shutdown(); + try { + if (!pool.awaitTermination(awaitShutdownMillis, TimeUnit.MILLISECONDS)) { + LOG.warn("Failed to complete all tasks in FJP [%s]", this); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("interrupted on shutdown", e); + } + } + + public ForkJoinPool getPool() + { + return pool; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java index c9afa1ab3c67..8e827ff510fa 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.utils.JvmUtils; @@ -34,6 +35,7 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; public static final int DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = -1; public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; + public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000; private AtomicReference computedBufferSizeBytes = new AtomicReference<>(); @@ -144,4 +146,59 @@ public String getTmpDir() { return System.getProperty("java.io.tmpdir"); } + + @Config(value = "${base_path}.merge.useParallelMergePool") + public boolean useParallelMergePool() + { + return true; + } + + @Config(value = "${base_path}.merge.pool.parallelism") + public int getNumThreadsMergePoolConfigured() + { + return DEFAULT_NUM_THREADS; + } + + public int getMergePoolParallelism() + { + int numThreadsConfigured = getNumThreadsMergePoolConfigured(); + if (numThreadsConfigured != DEFAULT_NUM_THREADS) { + return numThreadsConfigured; + } else { + // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5 + return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); + } + } + + @Config(value = "${base_path}.merge.pool.awaitShutdownMillis") + public long getMergePoolAwaitShutdownMillis() + { + return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS; + } + + @Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism") + public int getMergePoolDefaultMaxQueryParallelism() + { + // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores + return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1); + } + + @Config(value = "${base_path}.merge.task.targetRunTimeMillis") + public int getMergePoolTargetTaskRunTimeMillis() + { + return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS; + } + + @Config(value = "${base_path}.merge.task.initialYieldNumRows") + public int getMergePoolTaskInitialYieldRows() + { + return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS; + } + + @Config(value = "${base_path}.merge.task.smallBatchNumRows") + public int getMergePoolSmallBatchRows() + { + return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS; + } } + diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index ba869bdbbfa2..562336556750 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -39,6 +39,11 @@ public class QueryContexts public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; + public static final String BROKER_PARALLEL_MERGE_KEY = "enableParallelMerge"; + public static final String BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY = "parallelMergeInitialYieldRows"; + public static final String BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY = "parallelMergeSmallBatchRows"; + public static final String BROKER_PARALLELISM = "parallelMergeParallelism"; + @Deprecated public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; public static final String VECTORIZE_KEY = "vectorize"; @@ -54,6 +59,7 @@ public class QueryContexts public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); public static final long NO_TIMEOUT = 0; + public static final boolean DEFAULT_ENABLE_PARALLEL_MERGE = true; @SuppressWarnings("unused") // Used by Jackson serialization public enum Vectorize @@ -195,6 +201,26 @@ public static int getPriority(Query query, int defaultValue) return parseInt(query, PRIORITY_KEY, defaultValue); } + public static boolean getEnableParallelMerges(Query query) + { + return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE); + } + + public static int getParallelMergeInitialYieldRows(Query query, int defaultValue) + { + return parseInt(query, BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY, defaultValue); + } + + public static int getParallelMergeSmallBatchRows(Query query, int defaultValue) + { + return parseInt(query, BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY, defaultValue); + } + + public static int getParallelMergeParallelism(Query query, int defaultValue) + { + return parseInt(query, BROKER_PARALLELISM, defaultValue); + } + @Deprecated public static String getChunkPeriod(Query query) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 4c760b02cdff..f30a4c528df0 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -111,11 +111,17 @@ public QueryRunner mergeResults(QueryRunner runner) /** * Creates a merge function that is used to merge intermediate aggregates from historicals in broker. This merge * function is used in the default {@link ResultMergeQueryRunner} provided by - * {@link QueryToolChest#mergeResults(QueryRunner)} and can be used in additional future merge implementations + * {@link QueryToolChest#mergeResults(QueryRunner)} and also used in + * {@link org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence} by 'CachingClusteredClient' if it + * does not return null. + * + * Returning null from this function means that a query does not support result merging, at + * least via the mechanisms that utilize this function. */ + @Nullable public BinaryOperator createMergeFn(Query query) { - throw new UOE("%s doesn't provide a merge function", query.getClass().getName()); + return null; } /** diff --git a/processing/src/main/java/org/apache/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ResultMergeQueryRunner.java index 6361479d24ef..1a01f7e1cc14 100644 --- a/processing/src/main/java/org/apache/druid/query/ResultMergeQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ResultMergeQueryRunner.java @@ -19,6 +19,7 @@ package org.apache.druid.query; +import com.google.common.base.Preconditions; import org.apache.druid.common.guava.CombiningSequence; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.guava.Sequence; @@ -43,6 +44,8 @@ public ResultMergeQueryRunner( ) { super(baseRunner); + Preconditions.checkNotNull(comparatorGenerator); + Preconditions.checkNotNull(mergeFnGenerator); this.comparatorGenerator = comparatorGenerator; this.mergeFnGenerator = mergeFnGenerator; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index 6125dd0897b4..dca39eb56e20 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -113,7 +113,7 @@ Sequence mergeResults( @Nullable default BinaryOperator createMergeFn(Query query) { - throw new UOE("%s doesn't provide a merge function", this.getClass().getName()); + return null; } /** diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 989abc2dec6f..1b9324530396 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -62,6 +62,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -130,21 +131,34 @@ public Sequence doRun( private Ordering makeOrdering(SegmentMetadataQuery query) { - if (query.isMerge()) { - // Merge everything always - return Comparators.alwaysEqual(); - } - - return query.getResultOrdering(); // No two elements should be equal, so it should never merge + return (Ordering) SegmentMetadataQueryQueryToolChest.this.createResultComparator(query); } private BinaryOperator createMergeFn(final SegmentMetadataQuery inQ) { - return (arg1, arg2) -> mergeAnalyses(arg1, arg2, inQ.isLenientAggregatorMerge()); + return SegmentMetadataQueryQueryToolChest.this.createMergeFn(inQ); } }; } + @Override + public BinaryOperator createMergeFn(Query query) + { + return (arg1, arg2) -> mergeAnalyses(arg1, arg2, ((SegmentMetadataQuery) query).isLenientAggregatorMerge()); + } + + @Override + public Comparator createResultComparator(Query query) + { + SegmentMetadataQuery segmentMetadataQuery = (SegmentMetadataQuery) query; + if (segmentMetadataQuery.isMerge()) { + // Merge everything always + return Comparators.alwaysEqual(); + } + + return segmentMetadataQuery.getResultOrdering(); // No two elements should be equal, so it should never merge + } + @Override public QueryMetrics> makeMetrics(SegmentMetadataQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 6499b88b99cd..d6a402fcc83b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.java.util.common.DateTimes; @@ -43,7 +45,9 @@ import org.apache.druid.timeline.LogicalSegment; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.List; +import java.util.function.BinaryOperator; import java.util.stream.Collectors; /** @@ -112,6 +116,27 @@ protected Sequence> doRun( }; } + @Override + public BinaryOperator> createMergeFn(Query> query) + { + TimeBoundaryQuery boundQuery = (TimeBoundaryQuery) query; + return (result1, result2) -> { + final List> mergeList; + if (result1 == null) { + mergeList = result2 != null ? ImmutableList.of(result2) : null; + } else { + mergeList = result2 != null ? ImmutableList.of(result1, result2) : ImmutableList.of(result1); + } + return Iterables.getOnlyElement(boundQuery.mergeResults(mergeList)); + }; + } + + @Override + public Comparator> createResultComparator(Query> query) + { + return query.getResultOrdering(); + } + @Override public QueryMetrics> makeMetrics(TimeBoundaryQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryResultValue.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryResultValue.java index d5a1c447ad5b..ae7240c4156e 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryResultValue.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import java.util.Map; +import java.util.Objects; /** */ @@ -80,11 +81,7 @@ public boolean equals(Object o) TimeBoundaryResultValue that = (TimeBoundaryResultValue) o; - if (value != null ? !value.equals(that.value) : that.value != null) { - return false; - } - - return true; + return Objects.equals(value, that.value); } @Override diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 18a4a028b367..b91731a54cef 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -37,6 +37,7 @@ import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.guice.annotations.Client; +import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.Intervals; @@ -45,11 +46,13 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; @@ -87,6 +90,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BinaryOperator; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -102,6 +107,8 @@ public class CachingClusteredClient implements QuerySegmentWalker private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; private final DruidHttpClientConfig httpClientConfig; + private final DruidProcessingConfig processingConfig; + private final ForkJoinPool pool; @Inject public CachingClusteredClient( @@ -111,7 +118,9 @@ public CachingClusteredClient( @Smile ObjectMapper objectMapper, CachePopulator cachePopulator, CacheConfig cacheConfig, - @Client DruidHttpClientConfig httpClientConfig + @Client DruidHttpClientConfig httpClientConfig, + DruidProcessingConfig processingConfig, + @Merging ForkJoinPool pool ) { this.warehouse = warehouse; @@ -121,6 +130,8 @@ public CachingClusteredClient( this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; this.httpClientConfig = httpClientConfig; + this.processingConfig = processingConfig; + this.pool = pool; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( @@ -286,10 +297,32 @@ Sequence run(final UnaryOperator> time List> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); addSequencesFromCache(sequencesByInterval, alreadyCachedResults); addSequencesFromServer(sequencesByInterval, segmentsByServer); + return merge(sequencesByInterval); + }); + } + + private Sequence merge(List> sequencesByInterval) + { + BinaryOperator mergeFn = toolChest.createMergeFn(query); + if (processingConfig.useParallelMergePool() && QueryContexts.getEnableParallelMerges(query) && mergeFn != null) { + return new ParallelMergeCombiningSequence<>( + pool, + sequencesByInterval, + query.getResultOrdering(), + mergeFn, + QueryContexts.hasTimeout(query), + QueryContexts.getTimeout(query), + QueryContexts.getPriority(query), + QueryContexts.getParallelMergeParallelism(query, processingConfig.getMergePoolDefaultMaxQueryParallelism()), + QueryContexts.getParallelMergeInitialYieldRows(query, processingConfig.getMergePoolTaskInitialYieldRows()), + QueryContexts.getParallelMergeSmallBatchRows(query, processingConfig.getMergePoolSmallBatchRows()), + processingConfig.getMergePoolTargetTaskRunTimeMillis() + ); + } else { return Sequences .simple(sequencesByInterval) .flatMerge(seq -> seq, query.getResultOrdering()); - }); + } } private Set computeSegmentsToQuery(TimelineLookup timeline) diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index efb140112d26..59700160090a 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -53,6 +53,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; /** */ @@ -135,6 +136,26 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) ); } + @Provides + @ManageLifecycle + public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config) + { + return new LifecycleForkJoinPoolProvider( + config.getMergePoolParallelism(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), + true, + config.getMergePoolAwaitShutdownMillis() + ); + } + + @Provides + @Merging + public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider) + { + return poolProvider.getPool(); + } + private void verifyDirectMemory(DruidProcessingConfig config) { try { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 84534d28e7d4..5fac93bbdc42 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -65,6 +66,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; /** */ @@ -301,13 +303,23 @@ public int getCacheBulkMergeLimit() return mergeLimit; } }, - new DruidHttpClientConfig() { + new DruidHttpClientConfig() + { @Override public long getMaxQueuedBytes() { return 0L; } - } + }, + new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + }, + ForkJoinPool.commonPool() ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 242795084917..36c453541041 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.Query; @@ -153,6 +154,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; /** * @@ -1380,16 +1382,13 @@ public void testTimeBoundaryCaching() .bound(TimeBoundaryQuery.MAX_TIME) .build(), Intervals.of("2011-01-01/2011-01-02"), - makeTimeBoundaryResult(DateTimes.of("2011-01-01"), null, DateTimes.of("2011-01-02")), + makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-02")), Intervals.of("2011-01-01/2011-01-03"), - makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-03")), + makeTimeBoundaryResult(DateTimes.of("2011-01-03"), null, DateTimes.of("2011-01-03")), Intervals.of("2011-01-01/2011-01-10"), - makeTimeBoundaryResult(DateTimes.of("2011-01-05"), null, DateTimes.of("2011-01-10")), - - Intervals.of("2011-01-01/2011-01-10"), - makeTimeBoundaryResult(DateTimes.of("2011-01-05T01"), null, DateTimes.of("2011-01-10")) + makeTimeBoundaryResult(DateTimes.of("2011-01-10"), null, DateTimes.of("2011-01-10")) ); testQueryCaching( @@ -1594,19 +1593,19 @@ private Iterable> makeTimeBoundaryResult( if (minTime != null && maxTime != null) { value = ImmutableMap.of( TimeBoundaryQuery.MIN_TIME, - minTime.toString(), + minTime, TimeBoundaryQuery.MAX_TIME, - maxTime.toString() + maxTime ); } else if (maxTime != null) { value = ImmutableMap.of( TimeBoundaryQuery.MAX_TIME, - maxTime.toString() + maxTime ); } else { value = ImmutableMap.of( TimeBoundaryQuery.MIN_TIME, - minTime.toString() + minTime ); } @@ -2493,7 +2492,16 @@ public long getMaxQueuedBytes() { return 0L; } - } + }, + new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + }, + ForkJoinPool.commonPool() ); } @@ -2766,16 +2774,13 @@ public void testTimeBoundaryCachingWhenTimeIsInteger() .bound(TimeBoundaryQuery.MAX_TIME) .build(), Intervals.of("1970-01-01/2011-01-02"), - makeTimeBoundaryResult(DateTimes.of("1970-01-01"), null, DateTimes.of("1970-01-02")), + makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-02")), Intervals.of("1970-01-01/2011-01-03"), - makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-03")), - - Intervals.of("1970-01-01/2011-01-10"), - makeTimeBoundaryResult(DateTimes.of("1970-01-05"), null, DateTimes.of("1970-01-10")), + makeTimeBoundaryResult(DateTimes.of("1970-01-03"), null, DateTimes.of("1970-01-03")), Intervals.of("1970-01-01/2011-01-10"), - makeTimeBoundaryResult(DateTimes.of("1970-01-05T01"), null, DateTimes.of("1970-01-10")) + makeTimeBoundaryResult(DateTimes.of("1970-01-10"), null, DateTimes.of("1970-01-10")) ); testQueryCaching( diff --git a/website/.spelling b/website/.spelling index f74eb0eb69e8..d8c7a09e5299 100644 --- a/website/.spelling +++ b/website/.spelling @@ -168,6 +168,7 @@ aggregator aggregators ambari analytics +async authorizer authorizers autocomplete @@ -1284,10 +1285,14 @@ druid.broker.cache.useCache druid.broker.cache.useResultLevelCache druid.historical.cache.populateCache druid.historical.cache.useCache +enableParallelMerge floatSum maxQueuedBytes maxScatterGatherBytes minTopNThreshold +parallelMergeInitialYieldRows +parallelMergeParallelism +parallelMergeSmallBatchRows populateCache populateResultLevelCache queryId