Skip to content

Commit

Permalink
Add join-related DataSource types, and analysis functionality.
Browse files Browse the repository at this point in the history
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.
3) Use the DataSourceAnalysis methods throughout the query handling stack, replacing
   various ad-hoc approaches. Most of the interesting changes are in
   ClientQuerySegmentWalker (brokers), ServerManager (historicals), and
   SinkQuerySegmentWalker (indexing tasks).

Other notes:

1) Changed TimelineServerView to return an Optional timeline, which I thought made
   the analysis changes cleaner to implement.
2) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
3) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to
   determine whether it is safe to pass a subquery dataSource to the query toolchest.
   Fixes an issue introduced in apache#5471 where subqueries under non-groupBy-typed queries
   were silently ignored, since neither the query entry point nor the toolchest did
   anything special with them.
4) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now
   returns false for cacheability.
  • Loading branch information
gianm committed Jan 21, 2020
1 parent d21054f commit 5b6406e
Show file tree
Hide file tree
Showing 72 changed files with 3,435 additions and 505 deletions.
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
Expand Down Expand Up @@ -58,7 +57,6 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
Expand Down Expand Up @@ -89,6 +87,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
Expand Down Expand Up @@ -126,14 +125,14 @@
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
Expand Down Expand Up @@ -217,8 +216,17 @@ public void setup()
.size(0)
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
LOG.info(
"Starting benchmark setup using cacheDir[%s], rows[%,d].",
segmentGenerator.getCacheDir(),
rowsPerSegment
);
final QueryableIndex index = segmentGenerator.generate(
dataSegment,
schemaInfo,
Granularities.NONE,
rowsPerSegment
);
queryableIndexes.put(dataSegment, index);
}

Expand Down Expand Up @@ -518,12 +526,10 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
}

@Nullable
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
final String table = Iterables.getOnlyElement(dataSource.getNames());
return timelines.get(table);
return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
}

@Override
Expand Down Expand Up @@ -563,7 +569,11 @@ private class SimpleQueryRunner implements QueryRunner<Object>
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryableIndexSegment segment;

public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex)
public SimpleQueryRunner(
QueryRunnerFactoryConglomerate conglomerate,
SegmentId segmentId,
QueryableIndex queryableIndex
)
{
this.conglomerate = conglomerate;
this.segment = new QueryableIndexSegment(queryableIndex, segmentId);
Expand Down
Expand Up @@ -23,9 +23,11 @@
import com.google.common.collect.ImmutableSortedSet;
import com.google.inject.Inject;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
Expand Down Expand Up @@ -54,24 +56,24 @@ public class DataSourceOptimizer
private ConcurrentHashMap<String, AtomicLong> hitCount = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, AtomicLong> costTime = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFields = new ConcurrentHashMap<>();

@Inject
public DataSourceOptimizer(TimelineServerView serverView)
public DataSourceOptimizer(TimelineServerView serverView)
{
this.serverView = serverView;
}

/**
* Do main work about materialized view selection: transform user query to one or more sub-queries.
*
* In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries'
*
* In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries'
* intervals equals the interval in user query
*
*
* Derived dataSource with smallest average data size per segment granularity have highest priority to replace the
* datasource in user query
*
*
* @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized
* @return a list of queries with specified derived dataSources and intervals
* @return a list of queries with specified derived dataSources and intervals
*/
public List<Query> optimize(Query query)
{
Expand All @@ -86,7 +88,7 @@ public List<Query> optimize(Query query)
// get all derivatives for datasource in query. The derivatives set is sorted by average size of
// per segment granularity.
Set<DerivativeDataSource> derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName);

if (derivatives.isEmpty()) {
return Collections.singletonList(query);
}
Expand All @@ -96,10 +98,10 @@ public List<Query> optimize(Query query)
hitCount.putIfAbsent(datasourceName, new AtomicLong(0));
costTime.putIfAbsent(datasourceName, new AtomicLong(0));
totalCount.get(datasourceName).incrementAndGet();

// get all fields which the query required
Set<String> requiredFields = MaterializedViewUtils.getRequiredFields(query);

Set<DerivativeDataSource> derivativesWithRequiredFields = new HashSet<>();
for (DerivativeDataSource derivativeDataSource : derivatives) {
derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0));
Expand All @@ -115,14 +117,15 @@ public List<Query> optimize(Query query)
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return Collections.singletonList(query);
}

List<Query> queries = new ArrayList<>();
List<Interval> remainingQueryIntervals = (List<Interval>) query.getIntervals();

for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
final List<Interval> derivativeIntervals = remainingQueryIntervals.stream()
.flatMap(interval -> serverView
.getTimeline((new TableDataSource(derivativeDataSource.getName())))
.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName())))
.orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName()))
.lookup(interval)
.stream()
.map(TimelineObjectHolder::getInterval)
Expand All @@ -133,7 +136,7 @@ public List<Query> optimize(Query query)
if (derivativeIntervals.isEmpty()) {
continue;
}

remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals);
queries.add(
query.withDataSource(new TableDataSource(derivativeDataSource.getName()))
Expand All @@ -158,13 +161,13 @@ public List<Query> optimize(Query query)
hitCount.get(datasourceName).incrementAndGet();
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return queries;
}
}
finally {
lock.readLock().unlock();
}
}

public List<DataSourceOptimizerStats> getAndResetStats()
public List<DataSourceOptimizerStats> getAndResetStats()
{
ImmutableMap<String, AtomicLong> derivativesHitCountSnapshot;
ImmutableMap<String, AtomicLong> totalCountSnapshot;
Expand All @@ -183,7 +186,7 @@ public List<DataSourceOptimizerStats> getAndResetStats()
hitCount.clear();
costTime.clear();
missFields.clear();
}
}
finally {
lock.writeLock().unlock();
}
Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
Expand All @@ -62,6 +61,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
Expand All @@ -84,6 +84,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

Expand Down Expand Up @@ -305,9 +306,9 @@ public void testQuery() throws IOException
new TimelineServerView()
{
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
return null;
return Optional.empty();
}

@Override
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
Expand Down Expand Up @@ -328,11 +328,13 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());

if (runningItem != null) {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final Task task = runningItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {

if (analysis.getBaseTableDataSource().isPresent()
&& task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
Expand Down Expand Up @@ -379,7 +381,7 @@ public String getTaskType()
{
return task.getType();
}

@Override
public String getDataSource()
{
Expand Down
20 changes: 7 additions & 13 deletions processing/src/main/java/org/apache/druid/query/BaseQuery.java
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
Expand Down Expand Up @@ -117,17 +118,11 @@ public QueryRunner<T> getRunner(QuerySegmentWalker walker)
}

@VisibleForTesting
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query)
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery<?> query)
{
if (query.getDataSource() instanceof QueryDataSource) {
QueryDataSource ds = (QueryDataSource) query.getDataSource();
Query subquery = ds.getQuery();
if (subquery instanceof BaseQuery) {
return getQuerySegmentSpecForLookUp((BaseQuery) subquery);
}
throw new IllegalStateException("Invalid subquery type " + subquery.getClass());
}
return query.getQuerySegmentSpec();
return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec()
.orElse(query.getQuerySegmentSpec());
}

@Override
Expand Down Expand Up @@ -270,14 +265,13 @@ public boolean equals(Object o)
Objects.equals(dataSource, baseQuery.dataSource) &&
Objects.equals(context, baseQuery.context) &&
Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) &&
Objects.equals(duration, baseQuery.duration) &&
Objects.equals(getDuration(), baseQuery.getDuration()) &&
Objects.equals(granularity, baseQuery.granularity);
}

@Override
public int hashCode()
{

return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity);
return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity);
}
}

0 comments on commit 5b6406e

Please sign in to comment.