From ca5b5fbb00210e1c9f9a702b3985ba8213f5bfab Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 30 Mar 2020 11:58:16 -0700 Subject: [PATCH] add lane enforcement for joinish queries (#9563) * add lane enforcement for joinish queries * oops * style * review stuffs --- .../CachingClusteredClientBenchmark.java | 12 +- .../moving-average-query/pom.xml | 7 + .../movingaverage/MovingAverageQueryTest.java | 11 +- .../druid/client/SegmentServerSelector.java | 26 ++- .../druid/server/LocalQuerySegmentWalker.java | 37 +++- .../apache/druid/server/QueryScheduler.java | 13 ++ ...chingClusteredClientFunctionalityTest.java | 7 +- .../server/ClientQuerySegmentWalkerTest.java | 65 ++++++- .../server/ObservableQueryScheduler.java | 180 ++++++++++++++++++ .../druid/server/QueryResourceTest.java | 7 +- .../druid/server/QuerySchedulerTest.java | 93 --------- .../apache/druid/server/QueryStackTests.java | 20 +- .../server/TestClusterQuerySegmentWalker.java | 6 +- .../druid/sql/calcite/util/CalciteTests.java | 5 +- .../SpecificSegmentsQuerySegmentWalker.java | 7 +- 15 files changed, 348 insertions(+), 148 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index febebbfcab7d..94b560e8f4df 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; @@ -343,12 +340,7 @@ public > QueryToolChest getToolChest new DruidHttpClientConfig(), processingConfig, forkJoinPool, - new QueryScheduler( - 0, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml index c5ad9eebf6ca..6c77b6051182 100644 --- a/extensions-contrib/moving-average-query/pom.xml +++ b/extensions-contrib/moving-average-query/pom.xml @@ -105,6 +105,13 @@ test-jar test + + org.apache.druid + druid-server + ${project.parent.version} + test-jar + test + junit junit diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 8d76692e9c85..971f8ed8b264 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -65,10 +65,8 @@ import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -365,12 +363,7 @@ public String getFormatString() } }, ForkJoinPool.commonPool(), - new QueryScheduler( - 0, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java index 007b7a254591..5f5de0e78136 100644 --- a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java @@ -19,22 +19,46 @@ package org.apache.druid.client; +import com.google.common.base.Preconditions; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.SegmentDescriptor; +import javax.annotation.Nullable; + /** * Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query. * - * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data + * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used + * by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries */ public class SegmentServerSelector extends Pair { + /** + * This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick + * a {@link DruidServer} to query. + */ public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment) { super(server, segment); + Preconditions.checkNotNull(server, "ServerSelector must not be null"); + Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null"); + } + + /** + * This is for a segment hosted locally + */ + public SegmentServerSelector(SegmentDescriptor segment) + { + super(null, segment); + Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null"); } + /** + * This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments + * which must be queried remotely (e.g. {@link CachingClusteredClient}) + */ + @Nullable public ServerSelector getServer() { return lhs; diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 0a01b8c4bb0c..d7f39adaa4ba 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -20,6 +20,7 @@ package org.apache.druid.server; import com.google.inject.Inject; +import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; @@ -27,6 +28,7 @@ import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -39,6 +41,8 @@ import org.apache.druid.segment.join.Joinables; import org.joda.time.Interval; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.StreamSupport; @@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker private final QueryRunnerFactoryConglomerate conglomerate; private final SegmentWrangler segmentWrangler; private final JoinableFactory joinableFactory; + private final QueryScheduler scheduler; private final ServiceEmitter emitter; @Inject @@ -64,12 +69,14 @@ public LocalQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate, SegmentWrangler segmentWrangler, JoinableFactory joinableFactory, + QueryScheduler scheduler, ServiceEmitter emitter ) { this.conglomerate = conglomerate; this.segmentWrangler = segmentWrangler; this.joinableFactory = joinableFactory; + this.scheduler = scheduler; this.emitter = emitter; } @@ -82,21 +89,23 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource()); } - final AtomicLong cpuAccumulator = new AtomicLong(0L); - final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query); final Iterable segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals); + final Query prioritizedAndLaned = prioritizeAndLaneQuery(query, segments); + + final AtomicLong cpuAccumulator = new AtomicLong(0L); final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, cpuAccumulator, - QueryContexts.getEnableJoinFilterPushDown(query), - QueryContexts.getEnableJoinFilterRewrite(query), - QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query), - QueryContexts.getJoinFilterRewriteMaxSize(query), - query.getFilter() == null ? null : query.getFilter().toFilter(), - query.getVirtualColumns() + QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned), + QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned), + QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned), + QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned), + prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(), + prioritizedAndLaned.getVirtualColumns() ); + final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned); final QueryRunner baseRunner = queryRunnerFactory.mergeRunners( Execs.directExecutor(), () -> StreamSupport.stream(segments.spliterator(), false) @@ -107,17 +116,25 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final // Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where // it is already supported. return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest()) - .create(baseRunner) + .create(scheduler.wrapQueryRunner(baseRunner)) .applyPreMergeDecoration() .mergeResults() .applyPostMergeDecoration() .emitCPUTimeMetric(emitter, cpuAccumulator); } - @Override public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { // SegmentWranglers only work based on intervals and cannot run with specific segments. throw new ISE("Cannot run with specific segments"); } + + private Query prioritizeAndLaneQuery(Query query, Iterable segments) + { + Set segmentServerSelectors = new HashSet<>(); + for (Segment s : segments) { + segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor())); + } + return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors); + } } diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index 86c9ec96b878..f50b50c80390 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -30,10 +30,12 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryWatcher; import org.apache.druid.server.initialization.ServerConfig; @@ -140,6 +142,17 @@ public Sequence run(Query query, Sequence resultSequence) return resultSequence.withBaggage(() -> finishLanes(bulkheads)); } + /** + * Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called. + */ + public QueryRunner wrapQueryRunner(QueryRunner baseRunner) + { + return (queryPlus, responseContext) -> + QueryScheduler.this.run( + queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext)) + ); + } + /** * Forcibly cancel all futures that have been registered to a specific query id */ diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 779fd5478d0c..5303989b80cc 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -47,11 +47,8 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -335,7 +332,7 @@ public int getMergePoolParallelism() } }, ForkJoinPool.commonPool(), - new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 2215dcb162ae..c98c7768decd 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest // version VERSION, and shard spec SHARD_SPEC. private ClientQuerySegmentWalker walker; + private ObservableQueryScheduler scheduler; + @Before public void setUp() { closer = Closer.create(); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); - initWalker(ImmutableMap.of()); + scheduler = new ObservableQueryScheduler( + 8, + ManualQueryPrioritizationStrategy.INSTANCE, + NoQueryLaningStrategy.INSTANCE, + new ServerConfig() + ); + initWalker(ImmutableMap.of(), scheduler); } @After @@ -182,6 +192,11 @@ public void testTimeseriesOnTable() ImmutableList.of(ExpectedQuery.cluster(query)), ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -200,6 +215,11 @@ public void testTimeseriesOnInline() ImmutableList.of(ExpectedQuery.local(query)), ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -236,6 +256,13 @@ public void testTimeseriesOnGroupByOnTable() ), ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -263,6 +290,11 @@ public void testGroupByOnGroupByOnTable() ImmutableList.of(ExpectedQuery.cluster(subquery)), ImmutableList.of(new Object[]{3L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -299,6 +331,13 @@ public void testGroupByOnUnionOfTwoTables() new Object[]{"z", 1L} ) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -351,6 +390,13 @@ public void testJoinOnGroupByOnTable() ), ImmutableList.of(new Object[]{"y", "y", 1L}) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -408,9 +454,17 @@ public void testTimeseriesOnGroupByOnTableErrorTooManyRows() } /** - * Initialize (or reinitialize) our {@link #walker} and {@link #closer}. + * Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler. */ private void initWalker(final Map serverProperties) + { + initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER); + } + + /** + * Initialize (or reinitialize) our {@link #walker} and {@link #closer}. + */ + private void initWalker(final Map serverProperties, QueryScheduler schedulerForTest) { final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class); @@ -472,7 +526,7 @@ BAR, makeTimeline(BAR, BAR_INLINE) ), joinableFactory, conglomerate, - null /* QueryScheduler */ + schedulerForTest ), ClusterOrLocal.CLUSTER ), @@ -480,8 +534,9 @@ BAR, makeTimeline(BAR, BAR_INLINE) QueryStackTests.createLocalQuerySegmentWalker( conglomerate, segmentWrangler, - joinableFactory - ), + joinableFactory, + schedulerForTest + ), ClusterOrLocal.LOCAL ), conglomerate, diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java new file mode 100644 index 000000000000..638f5f28dc9f --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import io.github.resilience4j.bulkhead.Bulkhead; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.initialization.ServerConfig; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed + * and verified by tests + */ +public class ObservableQueryScheduler extends QueryScheduler +{ + private final AtomicLong totalAcquired; + private final AtomicLong totalReleased; + private final AtomicLong laneAcquired; + private final AtomicLong laneNotAcquired; + private final AtomicLong laneReleased; + private final AtomicLong totalPrioritizedAndLaned; + private final AtomicLong totalRun; + + public ObservableQueryScheduler( + int totalNumThreads, + QueryPrioritizationStrategy prioritizationStrategy, + QueryLaningStrategy laningStrategy, + ServerConfig serverConfig + ) + { + super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig); + + totalAcquired = new AtomicLong(); + totalReleased = new AtomicLong(); + laneAcquired = new AtomicLong(); + laneNotAcquired = new AtomicLong(); + laneReleased = new AtomicLong(); + totalPrioritizedAndLaned = new AtomicLong(); + totalRun = new AtomicLong(); + } + + @Override + public Sequence run( + Query query, + Sequence resultSequence + ) + { + return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet); + } + + @Override + public Query prioritizeAndLaneQuery( + QueryPlus queryPlus, + Set segments + ) + { + totalPrioritizedAndLaned.incrementAndGet(); + return super.prioritizeAndLaneQuery(queryPlus, segments); + } + + @Override + List acquireLanes(Query query) + { + List bulkheads = super.acquireLanes(query); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalAcquired.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneAcquired.incrementAndGet(); + } + + return bulkheads; + } + + @Override + void releaseLanes(List bulkheads) + { + super.releaseLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + if (bulkheads.size() == 1) { + laneNotAcquired.incrementAndGet(); + } + } + } + + @Override + void finishLanes(List bulkheads) + { + super.finishLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + } + } + + /** + * Number of times that 'total' query count semaphore was acquired + */ + public AtomicLong getTotalAcquired() + { + return totalAcquired; + } + + /** + * Number of times that 'total' query count semaphore was released + */ + public AtomicLong getTotalReleased() + { + return totalReleased; + } + + /** + * Number of times that the query count semaphore of any lane was acquired + */ + public AtomicLong getLaneAcquired() + { + return laneAcquired; + } + + /** + * Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired + */ + public AtomicLong getLaneNotAcquired() + { + return laneNotAcquired; + } + + /** + * Number of times that the query count semaphore of any lane was released + */ + public AtomicLong getLaneReleased() + { + return laneReleased; + } + + /** + * Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called + */ + public AtomicLong getTotalPrioritizedAndLaned() + { + return totalPrioritizedAndLaned; + } + + /** + * Number of times that {@link QueryScheduler#run} was called + */ + public AtomicLong getTotalRun() + { + return totalRun; + } +} diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index d17a3726be0c..5bda4338738c 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -174,12 +174,7 @@ public void setup() EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes(); EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes(); EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes(); - queryScheduler = new QueryScheduler( - 8, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ); + queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER; testRequestLogger = new TestRequestLogger(); queryResource = new QueryResource( new QueryLifecycleFactory( diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 177be8214b1d..485275b07bf2 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -30,7 +30,6 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.ProvisionException; -import io.github.resilience4j.bulkhead.Bulkhead; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.JsonConfigProvider; @@ -72,7 +71,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; public class QuerySchedulerTest { @@ -708,95 +706,4 @@ private Injector createInjector() ); return injector; } - - private static class ObservableQueryScheduler extends QueryScheduler - { - private final AtomicLong totalAcquired; - private final AtomicLong totalReleased; - private final AtomicLong laneAcquired; - private final AtomicLong laneNotAcquired; - private final AtomicLong laneReleased; - - public ObservableQueryScheduler( - int totalNumThreads, - QueryPrioritizationStrategy prioritizationStrategy, - QueryLaningStrategy laningStrategy, - ServerConfig serverConfig - ) - { - super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig); - - totalAcquired = new AtomicLong(); - totalReleased = new AtomicLong(); - laneAcquired = new AtomicLong(); - laneNotAcquired = new AtomicLong(); - laneReleased = new AtomicLong(); - } - - @Override - List acquireLanes(Query query) - { - List bulkheads = super.acquireLanes(query); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalAcquired.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneAcquired.incrementAndGet(); - } - - return bulkheads; - } - - @Override - void releaseLanes(List bulkheads) - { - super.releaseLanes(bulkheads); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalReleased.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneReleased.incrementAndGet(); - if (bulkheads.size() == 1) { - laneNotAcquired.incrementAndGet(); - } - } - } - - @Override - void finishLanes(List bulkheads) - { - super.finishLanes(bulkheads); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalReleased.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneReleased.incrementAndGet(); - } - } - - public AtomicLong getTotalAcquired() - { - return totalAcquired; - } - - public AtomicLong getTotalReleased() - { - return totalReleased; - } - - public AtomicLong getLaneAcquired() - { - return laneAcquired; - } - - public AtomicLong getLaneNotAcquired() - { - return laneNotAcquired; - } - - public AtomicLong getLaneReleased() - { - return laneReleased; - } - } } diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index a07e14ed5fa2..1370ed73ebac 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -64,6 +64,8 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.VersionedIntervalTimeline; import javax.annotation.Nullable; @@ -75,6 +77,12 @@ */ public class QueryStackTests { + public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler( + 0, + ManualQueryPrioritizationStrategy.INSTANCE, + NoQueryLaningStrategy.INSTANCE, + new ServerConfig() + ); private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; @@ -148,10 +156,17 @@ public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker( public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final SegmentWrangler segmentWrangler, - final JoinableFactory joinableFactory + final JoinableFactory joinableFactory, + final QueryScheduler scheduler ) { - return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER); + return new LocalQuerySegmentWalker( + conglomerate, + segmentWrangler, + joinableFactory, + scheduler, + EMITTER + ); } /** @@ -255,4 +270,5 @@ public int getNumMergeBuffers() return conglomerate; } + } diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 7fef5a9d71c2..cc3a406cfe78 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -165,11 +165,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments. // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan) - // to function properly. + // to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using + // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects + // to actually serve the queries return (theQuery, responseContext) -> { if (scheduler != null) { Set segments = new HashSet<>(); - specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec))); + specs.forEach(spec -> segments.add(new SegmentServerSelector(spec))); return scheduler.run( scheduler.prioritizeAndLaneQuery(theQuery, segments), new LazySequence<>( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 5d446c54a83e..5ae13902c7e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -77,6 +77,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; @@ -572,13 +573,13 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( final File tmpDir ) { - return createMockWalker(conglomerate, tmpDir, null); + return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER); } public static SpecificSegmentsQuerySegmentWalker createMockWalker( final QueryRunnerFactoryConglomerate conglomerate, final File tmpDir, - @Nullable final QueryScheduler scheduler + final QueryScheduler scheduler ) { final QueryableIndex index1 = IndexBuilder diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 50134ffcb353..1da0fbabf9c9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -85,7 +85,7 @@ public SpecificSegmentsQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final LookupExtractorFactoryContainerProvider lookupProvider, @Nullable final JoinableFactory joinableFactory, - @Nullable final QueryScheduler scheduler + final QueryScheduler scheduler ) { final JoinableFactory joinableFactoryToUse; @@ -116,7 +116,8 @@ public SpecificSegmentsQuerySegmentWalker( .put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider)) .build() ), - joinableFactoryToUse + joinableFactoryToUse, + scheduler ), conglomerate, new ServerConfig() @@ -146,7 +147,7 @@ public Optional get(String lookupName) } }, null, - null + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); }