Skip to content

Commit

Permalink
Add join-related DataSource types, and analysis functionality.
Browse files Browse the repository at this point in the history
Builds on apache#9111 and implements the datasource analysis mentioned in apache#8728. Still can't
handle join datasources, but we're a step closer.

Join-related DataSource types:

1) Add "join", "lookup", and "inline" datasources.
2) Add "getChildren" and "withChildren" methods to DataSource, which will be used
   in the future for query rewriting (e.g. inlining of subqueries).

DataSource analysis functionality:

1) Add DataSourceAnalysis class, which breaks down datasources into three components:
   outer queries, a base datasource (left-most of the highest level left-leaning join
   tree), and other joined-in leaf datasources (the right-hand branches of the
   left-leaning join tree).
2) Add "isConcrete", "isGlobal", and "isCacheable" methods to DataSource in order to
   support analysis.

Other notes:

1) Renamed DataSource#getNames to DataSource#getTableNames, which I think is clearer.
   Also, made it a Set, so implementations don't need to worry about duplicates.
2) The addition of "isCacheable" should work around apache#8713, since UnionDataSource now
   returns false for cacheability.
  • Loading branch information
gianm committed Jan 21, 2020
1 parent d21054f commit 2cda5c6
Show file tree
Hide file tree
Showing 44 changed files with 2,837 additions and 136 deletions.
Expand Up @@ -522,7 +522,7 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
final String table = Iterables.getOnlyElement(dataSource.getNames());
final String table = Iterables.getOnlyElement(dataSource.getTableNames());
return timelines.get(table);
}

Expand Down
Expand Up @@ -328,7 +328,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames());

if (runningItem != null) {
final Task task = runningItem.getTask();
Expand Down
Expand Up @@ -270,14 +270,13 @@ public boolean equals(Object o)
Objects.equals(dataSource, baseQuery.dataSource) &&
Objects.equals(context, baseQuery.context) &&
Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) &&
Objects.equals(duration, baseQuery.duration) &&
Objects.equals(getDuration(), baseQuery.getDuration()) &&
Objects.equals(granularity, baseQuery.granularity);
}

@Override
public int hashCode()
{

return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity);
return Objects.hash(dataSource, descending, context, querySegmentSpec, getDuration(), granularity);
}
}
62 changes: 53 additions & 9 deletions processing/src/main/java/org/apache/druid/query/DataSource.java
Expand Up @@ -23,17 +23,61 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.util.List;
import java.util.Set;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
/**
* Represents a source... of data... for a query. Analogous to the "FROM" clause in SQL.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union")
})
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
})
public interface DataSource
{
List<String> getNames();
/**
* Returns the names of all table datasources involved in this query. Does not include names for non-tables, like
* lookups or inline datasources.
*/
Set<String> getTableNames();

/**
* Returns datasources that this datasource depends on. Will be empty for leaf datasources like 'table'.
*/
List<DataSource> getChildren();

/**
* Return a new DataSource, identical to this one, with different children.
*/
DataSource withChildren(List<DataSource> children);

/**
* Returns true if queries on this dataSource are cacheable at both the result level and per-segment level.
* Currently, dataSources that modify the behavior of per-segment processing are not cacheable (like 'join').
* Nor are dataSources that do not actually reference segments (like 'inline'), since cache keys are always based
* on segment identifiers.
*
* Note: Ideally, queries on 'join' datasources _would_ be cacheable, but we cannot currently do this due to lacking
* the code necessary to compute cache keys properly.
*/
boolean isCacheable();

/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
*/
boolean isGlobal();

/**
* Returns true if this datasource represents concrete data that can be scanned via a
* {@link org.apache.druid.segment.Segment} adapter of some kind. True for e.g. 'table' but not for 'query' or 'join'.
*
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteBased() which uses this
* @see org.apache.druid.query.planning.DataSourceAnalysis#isConcreteTableBased() which uses this
*/
boolean isConcrete();
}

This file was deleted.

Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand All @@ -30,7 +31,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
Expand All @@ -42,9 +45,22 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();

/** Non final to give subclasses ability to reassign it. */
/**
* Non final to give subclasses ability to reassign it.
*/
protected Thread ownerThread = Thread.currentThread();

private static String getTableNamesAsString(DataSource dataSource)
{
final Set<String> names = dataSource.getTableNames();

if (names.size() == 1) {
return Iterables.getOnlyElement(names);
} else {
return names.stream().sorted().collect(Collectors.toList()).toString();
}
}

protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
Expand Down Expand Up @@ -77,7 +93,7 @@ public void query(QueryType query)
@Override
public void dataSource(QueryType query)
{
setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
setDimension(DruidMetrics.DATASOURCE, getTableNamesAsString(query.getDataSource()));
}

@Override
Expand Down

0 comments on commit 2cda5c6

Please sign in to comment.