Skip to content

Commit

Permalink
add lane enforcement for joinish queries (#9563)
Browse files Browse the repository at this point in the history
* add lane enforcement for joinish queries

* oops

* style

* review stuffs
  • Loading branch information
clintropolis committed Mar 30, 2020
1 parent c0195a1 commit fa5da66
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -343,12 +340,7 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool,
new QueryScheduler(
0,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
)
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}

Expand Down
7 changes: 7 additions & 0 deletions extensions-contrib/moving-average-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
Expand Down Expand Up @@ -365,12 +363,7 @@ public String getFormatString()
}
},
ForkJoinPool.commonPool(),
new QueryScheduler(
0,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
)
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);

ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,46 @@

package org.apache.druid.client;

import com.google.common.base.Preconditions;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.SegmentDescriptor;

import javax.annotation.Nullable;

/**
* Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
*
* Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
* Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used
* by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries
*/
public class SegmentServerSelector extends Pair<ServerSelector, SegmentDescriptor>
{
/**
* This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick
* a {@link DruidServer} to query.
*/
public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
Preconditions.checkNotNull(server, "ServerSelector must not be null");
Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}

/**
* This is for a segment hosted locally
*/
public SegmentServerSelector(SegmentDescriptor segment)
{
super(null, segment);
Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}

/**
* This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments
* which must be queried remotely (e.g. {@link CachingClusteredClient})
*/
@Nullable
public ServerSelector getServer()
{
return lhs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.druid.server;

import com.google.inject.Inject;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand All @@ -39,6 +41,8 @@
import org.apache.druid.segment.join.Joinables;
import org.joda.time.Interval;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
Expand All @@ -57,19 +61,22 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final SegmentWrangler segmentWrangler;
private final JoinableFactory joinableFactory;
private final QueryScheduler scheduler;
private final ServiceEmitter emitter;

@Inject
public LocalQuerySegmentWalker(
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
QueryScheduler scheduler,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactory = joinableFactory;
this.scheduler = scheduler;
this.emitter = emitter;
}

Expand All @@ -82,21 +89,23 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
}

final AtomicLong cpuAccumulator = new AtomicLong(0L);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);

final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
prioritizedAndLaned.getVirtualColumns()
);

final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
() -> StreamSupport.stream(segments.spliterator(), false)
Expand All @@ -107,17 +116,25 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
.create(baseRunner)
.create(scheduler.wrapQueryRunner(baseRunner))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}

@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// SegmentWranglers only work based on intervals and cannot run with specific segments.
throw new ISE("Cannot run with specific segments");
}

private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<Segment> segments)
{
Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>();
for (Segment s : segments) {
segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor()));
}
return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors);
}
}
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/druid/server/QueryScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.server.initialization.ServerConfig;

Expand Down Expand Up @@ -140,6 +142,17 @@ public <T> Sequence<T> run(Query<?> query, Sequence<T> resultSequence)
return resultSequence.withBaggage(() -> finishLanes(bulkheads));
}

/**
* Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called.
*/
public <T> QueryRunner<T> wrapQueryRunner(QueryRunner<T> baseRunner)
{
return (queryPlus, responseContext) ->
QueryScheduler.this.run(
queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext))
);
}

/**
* Forcibly cancel all futures that have been registered to a specific query id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
Expand Down Expand Up @@ -335,7 +332,7 @@ public int getMergePoolParallelism()
}
},
ForkJoinPool.commonPool(),
new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}

Expand Down
Loading

0 comments on commit fa5da66

Please sign in to comment.