Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes for inline subqueries when multi-value dimension is present #9698

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public GroupByEngineIterator make()
processingBuffer,
fudgeTimestamp,
dims,
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions()),
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false),
cardinalityForArrayAggregation
);
} else {
Expand All @@ -238,7 +238,7 @@ public GroupByEngineIterator make()
processingBuffer,
fudgeTimestamp,
dims,
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions())
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, query.getDimensions(), false)
);
}
}
Expand Down Expand Up @@ -313,12 +313,15 @@ public static int getCardinalityForArrayAggregation(
}

/**
* Checks whether all "dimensions" are either single-valued or nonexistent (which is just as good as single-valued,
* since their selectors will show up as full of nulls).
* Checks whether all "dimensions" are either single-valued, or if allowed, nonexistent. Since non-existent column
* selectors will show up as full of nulls they are effectively single valued, however they can also be null during
* broker merge, for example with an 'inline' datasource subquery. 'missingMeansNonexistent' is sort of a hack to let
* the vectorized engine, which only operates on actual segments, to still work in this case for non-existent columns.
*/
public static boolean isAllSingleValueDims(
final Function<String, ColumnCapabilities> capabilitiesFunction,
final List<DimensionSpec> dimensions
final List<DimensionSpec> dimensions,
final boolean missingMeansNonexistent
)
{
return dimensions
Expand All @@ -333,7 +336,8 @@ public static boolean isAllSingleValueDims(

// Now check column capabilities.
final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension());
return columnCapabilities == null || !columnCapabilities.hasMultipleValues();
return (columnCapabilities != null && !columnCapabilities.hasMultipleValues()) ||
(missingMeansNonexistent && columnCapabilities == null);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static boolean canVectorize(
// This situation should sort itself out pretty well once this engine supports multi-valued columns. Then we
// won't have to worry about having this all-single-value-dims check here.

return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions())
return GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, query.getDimensions(), true)
&& query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
&& query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ private TopNMapFn getMapFn(
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
} else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe nice to comment when columnCapabilities can be null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added additional information to existing comment

&& columnCapabilities.isDictionaryEncoded())) {
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
// which can happen for 'inline' data sources when this is run on the broker
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.ReferenceCountingSegment;
Expand Down Expand Up @@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest

private static final String FOO = "foo";
private static final String BAR = "bar";
private static final String MULTI = "multi";

private static final Interval INTERVAL = Intervals.of("2000/P1Y");
private static final String VERSION = "A";
Expand Down Expand Up @@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest
.build()
);

private static final InlineDataSource MULTI_VALUE_INLINE = InlineDataSource.fromIterable(
ImmutableList.<Object[]>builder()
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "b"), 1})
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", "c"), 2})
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"), 3})
.add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"), 4})
.build(),
RowSignature.builder()
.addTimeColumn()
.add("s", ValueType.STRING)
.add("n", ValueType.LONG)
.build()
);

@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -399,6 +418,115 @@ public void testJoinOnGroupByOnTable()
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}

@Test
public void testGroupByOnScanMultiValue()
{
ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
.columns("s", "n")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.ETERNITY)
)
)
.legacy(false)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n"))
.build();

testQuery(
query,
// GroupBy handles its own subqueries; only the inner one will go to the cluster.
ImmutableList.of(
ExpectedQuery.cluster(subquery),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{ImmutableList.of("a", "b"), 1},
new Object[]{ImmutableList.of("a", "c"), 2},
new Object[]{ImmutableList.of("b"), 3},
new Object[]{ImmutableList.of("c"), 4}
),
RowSignature.builder().add("s", null).add("n", null).build()
)
)
)
),
ImmutableList.of(
new Object[]{"a", 3L},
new Object[]{"b", 4L},
new Object[]{"c", 6L}
)
);

Assert.assertEquals(2, scheduler.getTotalRun().get());
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}

@Test
public void testTopNScanMultiValue()
{
ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
.columns("s", "n")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.ETERNITY)
)
)
.legacy(false)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final TopNQuery query =
new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.dimension(DefaultDimensionSpec.of("s"))
.metric("sum_n")
.threshold(100)
.aggregators(new LongSumAggregatorFactory("sum_n", "n"))
.build();

testQuery(
query,
// GroupBy handles its own subqueries; only the inner one will go to the cluster.
ImmutableList.of(
ExpectedQuery.cluster(subquery),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{ImmutableList.of("a", "b"), 1},
new Object[]{ImmutableList.of("a", "c"), 2},
new Object[]{ImmutableList.of("b"), 3},
new Object[]{ImmutableList.of("c"), 4}
),
RowSignature.builder().add("s", null).add("n", null).build()
)
)
)
),
ImmutableList.of(
new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L},
new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L},
new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L}
)
);

Assert.assertEquals(2, scheduler.getTotalRun().get());
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}

@Test
public void testJoinOnTableErrorCantInlineTable()
{
Expand Down Expand Up @@ -522,7 +650,8 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
QueryStackTests.createClusterQuerySegmentWalker(
ImmutableMap.of(
FOO, makeTimeline(FOO, FOO_INLINE),
BAR, makeTimeline(BAR, BAR_INLINE)
BAR, makeTimeline(BAR, BAR_INLINE),
MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
),
joinableFactory,
conglomerate,
Expand Down