Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JoinQuery #4118

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import com.google.common.collect.Lists;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.DataSourceWithSegmentSpec;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.TableDataSource;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.InDimFilter;
Expand All @@ -35,6 +37,7 @@
import org.joda.time.Interval;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -137,7 +140,8 @@ public Query<ScanResultValue> withDataSource(DataSource dataSource)
@Override
public Query<ScanResultValue> withOverriddenContext(Map<String, Object> contextOverrides)
{
return ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
return ScanQueryBuilder.copy(this).context(QueryContexts.computeOverriddenContext(getContext(), contextOverrides))
.build();
}

public ScanQuery withDimFilter(DimFilter dimFilter)
Expand Down Expand Up @@ -279,6 +283,21 @@ public ScanQueryBuilder dataSource(DataSource ds)
return this;
}

public ScanQueryBuilder updateDistributionTarget()
{
if (context == null) {
context = new HashMap<>();
}
context.put(
QueryContexts.DISTRIBUTION_TARGET_SOURCE,
new DataSourceWithSegmentSpec(
BaseQuery.getLeafDataSource(dataSource),
querySegmentSpec
)
);
return this;
}

public ScanQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
Expand All @@ -299,7 +318,11 @@ public ScanQueryBuilder intervals(List<Interval> l)

public ScanQueryBuilder context(Map<String, Object> c)
{
context = c;
if (context == null) {
context = new HashMap<>(c);
} else {
context.putAll(c);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testSerializationLegacyString() throws Exception
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"limit\":3,"
+ "\"context\":null}";
+ "\"context\":{}}";

String current =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
Expand All @@ -53,7 +53,7 @@ public void testSerializationLegacyString() throws Exception
+ "\"limit\":3,"
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"context\":null,"
+ "\"context\":{},"
+ "\"descending\":false}";

ScanQuery query = new ScanQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void testQuantileOnFloatAndLongs() throws Exception
new QuantilePostAggregator("a6", "a4:agg", 0.999f),
new QuantilePostAggregator("a7", "a7:agg", 0.50f)
))
.updateDistributionTarget()
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
Expand Down Expand Up @@ -261,6 +262,7 @@ public void testQuantileOnComplexColumn() throws Exception
new QuantilePostAggregator("a6", "a4:agg", 0.999f)
))
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.updateDistributionTarget()
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ public long countEvents(final Task task) throws Exception
)
).granularity(Granularities.ALL)
.intervals("0000/3000")
.updateDistributionTarget()
.build();

ArrayList<Result<TimeseriesResultValue>> results = Sequences.toList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.DataSourceWithSegmentSpec;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -343,19 +344,20 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final DataSourceWithSegmentSpec spec = query.getDistributionTarget();
final String dataSourceName = Iterables.getOnlyElement(spec.getDataSource().getNames());

for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
if (task.getDataSource().equals(dataSourceName)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", queryDataSource)
.addData("dataSource", dataSourceName)
.emit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,7 @@ public long sumMetric(final Task task, final String metric) throws Exception
)
).granularity(Granularities.ALL)
.intervals("2000/3000")
.updateDistributionTarget()
.build();

ArrayList<Result<TimeseriesResultValue>> results = Sequences.toList(
Expand Down
142 changes: 70 additions & 72 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.guava.Sequence;
Expand All @@ -43,7 +43,6 @@ public static void checkInterrupted()
}
}

public static final String QUERYID = "queryId";
private final DataSource dataSource;
private final boolean descending;
private final Map<String, Object> context;
Expand All @@ -61,18 +60,23 @@ public BaseQuery(
Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null");

this.dataSource = dataSource;
this.context = context;
this.context = context == null ? Maps.newTreeMap() : context;
this.querySegmentSpec = querySegmentSpec;
this.descending = descending;
}

@JsonProperty
@Override
public DataSource getDataSource()
{
return dataSource;
}

@Override
public List<DataSourceWithSegmentSpec> getDataSources()
{
return ImmutableList.of(new DataSourceWithSegmentSpec(dataSource, querySegmentSpec));
}

@JsonProperty
@Override
public boolean isDescending()
Expand All @@ -92,18 +96,18 @@ public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
return run(querySegmentSpec.lookup(this, walker), context);
}

public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
public List<Interval> getIntervals()
{
return runner.run(this, context);
return querySegmentSpec.getIntervals();
}

@Override
public List<Interval> getIntervals()
public Duration getDuration(DataSource dataSource)
{
return querySegmentSpec.getIntervals();
Preconditions.checkArgument(this.dataSource.equals(dataSource));
return getDuration();
}

@Override
public Duration getDuration()
{
if (duration == null) {
Expand All @@ -126,68 +130,13 @@ public Map<String, Object> getContext()
return context;
}

@Override
public <ContextType> ContextType getContextValue(String key)
{
return context == null ? null : (ContextType) context.get(key);
}

@Override
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue)
{
ContextType retVal = getContextValue(key);
return retVal == null ? defaultValue : retVal;
}

@Override
public boolean getContextBoolean(String key, boolean defaultValue)
{
return QueryContexts.parseBoolean(this, key, defaultValue);
}

/**
* @deprecated use {@link #computeOverriddenContext(Map, Map) computeOverriddenContext(getContext(), overrides))}
* instead. This method may be removed in the next minor or major version of Druid.
*/
@Deprecated
protected Map<String, Object> computeOverridenContext(final Map<String, Object> overrides)
{
return computeOverriddenContext(getContext(), overrides);
}

protected static Map<String, Object> computeOverriddenContext(
final Map<String, Object> context,
final Map<String, Object> overrides
)
{
Map<String, Object> overridden = Maps.newTreeMap();
if (context != null) {
overridden.putAll(context);
}
overridden.putAll(overrides);

return overridden;
}

@Override
public Ordering<T> getResultOrdering()
{
Ordering<T> retVal = Ordering.natural();
return descending ? retVal.reverse() : retVal;
}

@Override
public String getId()
{
return (String) getContextValue(QUERYID);
}

@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}

@Override
public boolean equals(Object o)
{
Expand All @@ -203,18 +152,16 @@ public boolean equals(Object o)
if (descending != baseQuery.descending) {
return false;
}
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
if (!context.equals(baseQuery.context)) {
return false;
}
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) {
if (!dataSource.equals(baseQuery.dataSource)) {
return false;
}
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) {
return false;
}
if (querySegmentSpec != null
? !querySegmentSpec.equals(baseQuery.querySegmentSpec)
: baseQuery.querySegmentSpec != null) {
if (!querySegmentSpec.equals(baseQuery.querySegmentSpec)) {
return false;
}

Expand All @@ -224,11 +171,62 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
int result = dataSource.hashCode();
result = 31 * result + (descending ? 1 : 0);
result = 31 * result + (context != null ? context.hashCode() : 0);
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
result = 31 * result + context.hashCode();
result = 31 * result + querySegmentSpec.hashCode();
result = 31 * result + (duration != null ? duration.hashCode() : 0);
return result;
}

public Query<T> updateDistributionTarget()
{
return distributeBy(new DataSourceWithSegmentSpec(BaseQuery.getLeafDataSource(dataSource), querySegmentSpec));
}

@Override
public Query<T> withQuerySegmentSpec(DataSource dataSource, QuerySegmentSpec spec)
{
Preconditions.checkArgument(this.dataSource.equals(dataSource));
final BaseQuery<T> result = (BaseQuery<T>) withQuerySegmentSpec(spec);
if (getDistributionTarget() != null && getDistributionTarget().getDataSource().equals(dataSource)) {
return result.updateDistributionTarget();
} else {
return result;
}
}

@Override
public Query<T> withQuerySegmentSpec(String concatenatedDataSourceName, QuerySegmentSpec spec)
{
Preconditions.checkArgument(this.dataSource.getConcatenatedName().equals(concatenatedDataSourceName));
return withQuerySegmentSpec(this.dataSource, spec);
}

@Override
public Query<T> replaceDataSource(DataSource oldDataSource, DataSource newDataSource)
{
Preconditions.checkArgument(this.dataSource.equals(oldDataSource));
return withDataSource(newDataSource);
}

public abstract Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
public abstract Query<T> withDataSource(DataSource dataSource);

public static <T extends Comparable<T>> DataSource getLeafDataSource(
BaseQuery<T> query
)
{
return getLeafDataSource(query.getDataSource());
}

public static DataSource getLeafDataSource(DataSource dataSource)
{
if (dataSource instanceof QueryDataSource) {
final QueryDataSource queryDataSource = (QueryDataSource) dataSource;
return getLeafDataSource((BaseQuery<?>) queryDataSource.getQuery());
} else {
return dataSource;
}
}
}