diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index cee136275c38..e967d54c6017 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -230,6 +230,7 @@ public Ordering getResultOrdering() return descending ? retVal.reverse() : retVal; } + @Nullable @Override public String getId() { diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 91303097fcd3..387509839bb5 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -120,6 +120,7 @@ public interface Query Query withId(String id); + @Nullable String getId(); default Query withSqlQueryId(String sqlQueryId) diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index 5ae0221f9b45..c271af8e4d5c 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.timeline.LogicalSegment; @@ -138,7 +139,7 @@ public Comparator createResultComparator(Query query) * to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some * meaningful dimensions for metrics given this query type. Examples might be the topN threshold for * a TopN query or the number of dimensions included for a groupBy query. - * + * *

QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source * tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected * instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or @@ -269,4 +270,50 @@ public List filterSegments(QueryType query, List resultArrayFields(QueryType query) + { + throw new UOE("Query type '%s' does not support returning results as arrays", query.getType()); + } + + /** + * Converts a sequence of this query's ResultType into arrays. The array schema is given by + * {@link #resultArrayFields}. This functionality is useful because it allows higher-level processors to operate on + * the results of any query in a consistent way. This is useful for the SQL layer and for any algorithm that might + * operate on the results of an inner query. + * + * Not all query types support this method. They will throw {@link UnsupportedOperationException}, and they cannot + * be used by the SQL layer or by generic higher-level algorithms. + * + * Some query types return less information after translating their results into arrays, especially in situations + * where there is no clear way to translate fully rich results into flat arrays. For example, the scan query does not + * include the segmentId in its array-based results, because it could potentially conflict with a 'segmentId' field + * in the actual datasource being scanned. + * + * It is possible that there will be multiple arrays returned for a single result object. For example, in the topN + * query, each {@link org.apache.druid.query.topn.TopNResultValue} will generate a separate array for each of its + * {@code values}. + * + * By convention, the array form should include the __time column, if present, as a long (milliseconds since epoch). + * + * @param resultSequence results of the form returned by {@link #mergeResults} + * + * @return results in array form + * + * @throws UnsupportedOperationException if this query type does not support returning results as arrays + */ + public Sequence resultsAsArrays(QueryType query, Sequence resultSequence) + { + throw new UOE("Query type '%s' does not support returning results as arrays", query.getType()); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 63d612d915d9..4274e8e19439 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -677,6 +677,17 @@ public ResultRow apply(Object input) }; } + @Override + public List resultArrayFields(final GroupByQuery query) + { + return query.getResultRowOrder(); + } + + @Override + public Sequence resultsAsArrays(final GroupByQuery query, final Sequence resultSequence) + { + return resultSequence.map(ResultRow::getArray); + } /** * This function checks the query for dimensions which can be optimized by applying the dimension extraction diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index d4155fac26a3..9c72b30f5be8 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -56,7 +56,7 @@ public class ScanQueryEngine { - private static final String LEGACY_TIMESTAMP_KEY = "timestamp"; + static final String LEGACY_TIMESTAMP_KEY = "timestamp"; public Sequence process( final ScanQuery query, @@ -202,9 +202,9 @@ public void remove() throw new UnsupportedOperationException(); } - private List rowsToCompactedList() + private List> rowsToCompactedList() { - final List events = new ArrayList<>(batchSize); + final List> events = new ArrayList<>(batchSize); final long iterLimit = Math.min(limit, offset + batchSize); for (; !cursor.isDone() && offset < iterLimit; cursor.advance(), offset++) { final List theEvent = new ArrayList<>(allColumns.size()); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 95006cee5766..7aa976e55474 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -22,9 +22,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.Iterables; import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; @@ -33,6 +38,11 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.MetricManipulationFn; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + public class ScanQueryQueryToolChest extends QueryToolChest { private static final TypeReference TYPE_REFERENCE = new TypeReference() @@ -115,4 +125,69 @@ public QueryRunner preMergeQueryDecoration(final QueryRunner resultArrayFields(final ScanQuery query) + { + if (query.getColumns() == null || query.getColumns().isEmpty()) { + // Note: if no specific list of columns is provided, then since we can't predict what columns will come back, we + // unfortunately can't do array-based results. In this case, there is a major difference between standard and + // array-based results: the standard results will detect and return _all_ columns, whereas the array-based results + // will include none of them. + return Collections.emptyList(); + } else if (query.withNonNullLegacy(scanQueryConfig).isLegacy()) { + final List retVal = new ArrayList<>(); + retVal.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY); + retVal.addAll(query.getColumns()); + return retVal; + } else { + return query.getColumns(); + } + } + + @Override + public Sequence resultsAsArrays(final ScanQuery query, final Sequence resultSequence) + { + final List fields = resultArrayFields(query); + final Function mapper; + + switch (query.getResultFormat()) { + case RESULT_FORMAT_LIST: + mapper = (Map row) -> { + final Object[] rowArray = new Object[fields.size()]; + + for (int i = 0; i < fields.size(); i++) { + rowArray[i] = row.get(fields.get(i)); + } + + return rowArray; + }; + break; + case RESULT_FORMAT_COMPACTED_LIST: + mapper = (List row) -> { + if (row.size() == fields.size()) { + return row.toArray(); + } else if (fields.isEmpty()) { + return new Object[0]; + } else { + // Uh oh... mismatch in expected and actual field count. I don't think this should happen, so let's + // throw an exception. If this really does happen, and there's a good reason for it, then we should remap + // the result row here. + throw new ISE("Mismatch in expected[%d] vs actual[%s] field count", fields.size(), row.size()); + } + }; + break; + default: + throw new UOE("Unsupported resultFormat for array-based results: %s", query.getResultFormat()); + } + + return resultSequence.flatMap( + result -> { + // Generics? Where we're going, we don't need generics. + final List rows = (List) result.getEvents(); + final Iterable arrays = Iterables.transform(rows, (Function) mapper); + return Sequences.simple(arrays); + } + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 9215e1102b37..1a6ca22e6858 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -50,8 +50,10 @@ import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.segment.RowBasedColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnHolder; import org.joda.time.DateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -402,6 +404,47 @@ public Function, Result> ma return makeComputeManipulatorFn(query, fn, true); } + @Override + public List resultArrayFields(TimeseriesQuery query) + { + final List fields = new ArrayList<>( + 1 + query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size() + ); + + fields.add(ColumnHolder.TIME_COLUMN_NAME); + query.getAggregatorSpecs().stream().map(AggregatorFactory::getName).forEach(fields::add); + query.getPostAggregatorSpecs().stream().map(PostAggregator::getName).forEach(fields::add); + + return fields; + } + + @Override + public Sequence resultsAsArrays( + final TimeseriesQuery query, + final Sequence> resultSequence + ) + { + final List fields = resultArrayFields(query); + + return Sequences.map( + resultSequence, + result -> { + final Object[] retVal = new Object[fields.size()]; + + // Position 0 is always __time. + retVal[0] = result.getTimestamp().getMillis(); + + // Add other fields. + final Map resultMap = result.getValue().getBaseObject(); + for (int i = 1; i < fields.size(); i++) { + retVal[i] = resultMap.get(fields.get(i)); + } + + return retVal; + } + ); + } + private Function, Result> makeComputeManipulatorFn( final TimeseriesQuery query, final MetricManipulationFn fn, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 3f0b5c7f7bf5..369537474e97 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -49,8 +49,10 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnHolder; import org.joda.time.DateTime; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -526,6 +528,53 @@ public DimensionAndMetricValueExtractor apply( }; } + @Override + public List resultArrayFields(TopNQuery query) + { + final List fields = new ArrayList<>( + 2 + query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size() + ); + + fields.add(ColumnHolder.TIME_COLUMN_NAME); + fields.add(query.getDimensionSpec().getOutputName()); + query.getAggregatorSpecs().stream().map(AggregatorFactory::getName).forEach(fields::add); + query.getPostAggregatorSpecs().stream().map(PostAggregator::getName).forEach(fields::add); + + return fields; + } + + @Override + public Sequence resultsAsArrays(TopNQuery query, Sequence> resultSequence) + { + final List fields = resultArrayFields(query); + + return resultSequence.flatMap( + result -> { + final List rows = result.getValue().getValue(); + + return Sequences.simple( + Iterables.transform( + rows, + row -> { + final Object[] retVal = new Object[fields.size()]; + + // Position 0 is always __time. + retVal[0] = result.getTimestamp().getMillis(); + + // Add other fields. + final Map resultMap = row.getBaseObject(); + for (int i = 1; i < fields.size(); i++) { + retVal[i] = resultMap.get(fields.get(i)); + } + + return retVal; + } + ) + ); + } + ); + } + static class ThresholdAdjustingQueryRunner implements QueryRunner> { private final QueryRunner> runner; diff --git a/processing/src/test/java/org/apache/druid/query/QueryToolChestTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryToolChestTestHelper.java new file mode 100644 index 000000000000..a3bedddc563a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/QueryToolChestTestHelper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.java.util.common.guava.Sequence; +import org.junit.Assert; + +import java.util.List; + +public class QueryToolChestTestHelper +{ + public static void assertArrayResultsEquals(final List expected, final Sequence actual) + { + final List actualList = actual.toList(); + Assert.assertEquals("number of results", expected.size(), actualList.size()); + for (int i = 0; i < actualList.size(); i++) { + Assert.assertArrayEquals("result #" + i, expected.get(i), actualList.get(i)); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index c461c7e774ac..c27b14b974b1 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -24,11 +24,15 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; @@ -61,6 +65,7 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -70,6 +75,11 @@ public class GroupByQueryQueryToolChestTest { + @BeforeClass + public static void setUpClass() + { + NullHandling.initializeForTests(); + } @Test public void testResultLevelCacheKeyWithPostAggregate() @@ -668,6 +678,100 @@ public void testResultSerde() throws Exception ); } + @Test + public void testResultArrayFieldsAllGran() + { + final GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("col", "dim")) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .setPostAggregatorSpecs(ImmutableList.of(QueryRunnerTestHelper.CONSTANT)) + .build(); + + Assert.assertEquals( + ImmutableList.of("dim", "rows", "index", "uniques", "const"), + new GroupByQueryQueryToolChest(null, null).resultArrayFields(query) + ); + } + + @Test + public void testResultArrayFieldsDayGran() + { + final GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(Granularities.DAY) + .setDimensions(new DefaultDimensionSpec("col", "dim")) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .setPostAggregatorSpecs(ImmutableList.of(QueryRunnerTestHelper.CONSTANT)) + .build(); + + Assert.assertEquals( + ImmutableList.of("__time", "dim", "rows", "index", "uniques", "const"), + new GroupByQueryQueryToolChest(null, null).resultArrayFields(query) + ); + } + + @Test + public void testResultsAsArraysAllGran() + { + final GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("col", "dim")) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .setPostAggregatorSpecs(ImmutableList.of(QueryRunnerTestHelper.CONSTANT)) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{"foo", 1L, 2L, 3L, 1L}, + new Object[]{"bar", 4L, 5L, 6L, 1L} + ), + new GroupByQueryQueryToolChest(null, null).resultsAsArrays( + query, + Sequences.simple( + ImmutableList.of( + makeRow(query, "2000", "dim", "foo", "rows", 1L, "index", 2L, "uniques", 3L, "const", 1L), + makeRow(query, "2000", "dim", "bar", "rows", 4L, "index", 5L, "uniques", 6L, "const", 1L) + ) + ) + ) + ); + } + + @Test + public void testResultsAsArraysDayGran() + { + final GroupByQuery query = new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(Granularities.DAY) + .setDimensions(new DefaultDimensionSpec("col", "dim")) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setAggregatorSpecs(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .setPostAggregatorSpecs(ImmutableList.of(QueryRunnerTestHelper.CONSTANT)) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{DateTimes.of("2000-01-01").getMillis(), "foo", 1L, 2L, 3L, 1L}, + new Object[]{DateTimes.of("2000-01-02").getMillis(), "bar", 4L, 5L, 6L, 1L} + ), + new GroupByQueryQueryToolChest(null, null).resultsAsArrays( + query, + Sequences.simple( + ImmutableList.of( + makeRow(query, "2000-01-01", "dim", "foo", "rows", 1L, "index", 2L, "uniques", 3L, "const", 1L), + makeRow(query, "2000-01-02", "dim", "bar", "rows", 4L, "index", 5L, "uniques", 6L, "const", 1L) + ) + ) + ) + ); + } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { switch (valueType) { @@ -765,4 +869,9 @@ private void doTestCacheStrategy(final ValueType valueType, final Object dimValu ResultRow fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue); Assert.assertEquals(typeAdjustedResult2, fromResultCacheResult); } + + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java new file mode 100644 index 000000000000..d0fe752bb019 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryToolChestTestHelper; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ScanQueryQueryToolChestTest +{ + // Expected results for the resultsAsArrays test methods. + private static final List ARRAY_RESULTS = ImmutableList.of( + new Object[]{null, 3.2}, + new Object[]{"x", "y"} + ); + + private final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ); + + @Test + public void test_resultArrayFields_columnsNotSpecified() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .build(); + + Assert.assertEquals(ImmutableList.of(), toolChest.resultArrayFields(scanQuery)); + } + + @Test + public void test_resultArrayFields_columnsNotSpecifiedLegacyMode() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .legacy(true) + .build(); + + Assert.assertEquals(ImmutableList.of(), toolChest.resultArrayFields(scanQuery)); + } + + @Test + public void test_resultArrayFields_columnsSpecified() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .columns("foo", "bar") + .build(); + + Assert.assertEquals(ImmutableList.of("foo", "bar"), toolChest.resultArrayFields(scanQuery)); + } + + @Test + public void test_resultArrayFields_columnsSpecifiedLegacyMode() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .columns("foo", "bar") + .legacy(true) + .build(); + + Assert.assertEquals(ImmutableList.of("timestamp", "foo", "bar"), toolChest.resultArrayFields(scanQuery)); + } + + @Test + public void test_resultsAsArrays_columnsNotSpecifiedListResults() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of(new Object[]{}, new Object[]{}), + toolChest.resultsAsArrays(scanQuery, makeResults(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) + ); + } + + @Test + public void test_resultsAsArrays_columnsNotSpecifiedCompactedListResults() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of(new Object[]{}, new Object[]{}), + toolChest.resultsAsArrays(scanQuery, makeResults(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) + ); + } + + @Test + public void test_resultsAsArrays_columnsSpecifiedListResults() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .columns("foo", "bar") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ARRAY_RESULTS, + toolChest.resultsAsArrays(scanQuery, makeResults(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) + ); + } + + @Test + public void test_resultsAsArrays_columnsSpecifiedCompactedListResults() + { + final ScanQuery scanQuery = + Druids.newScanQueryBuilder() + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2000/3000")))) + .columns("foo", "bar") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ARRAY_RESULTS, + toolChest.resultsAsArrays(scanQuery, makeResults(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) + ); + } + + /** + * Returns results that are a single ScanResultValue with two rows, each row having columns "foo" and "bar". + */ + private static Sequence makeResults(final ScanQuery.ResultFormat resultFormat) + { + final List rows = new ArrayList<>(); + + // Generate rows in the manner of ScanQueryEngine. + switch (resultFormat) { + case RESULT_FORMAT_LIST: + ARRAY_RESULTS.forEach(arr -> { + final Map m = new HashMap<>(); + m.put("foo", arr[0]); + m.put("bar", arr[1]); + rows.add(m); + }); + break; + case RESULT_FORMAT_COMPACTED_LIST: + ARRAY_RESULTS.forEach(arr -> rows.add(Arrays.asList(arr))); + break; + default: + throw new ISE("Cannot generate resultFormat '%s'", resultFormat); + } + + return Sequences.simple( + ImmutableList.of( + new ScanResultValue( + null, + ImmutableList.of("foo", "bar"), + rows + ) + ) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 8c0211ac76af..58547979234e 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -25,9 +25,11 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -131,7 +133,8 @@ public void testCacheStrategy() throws Exception strategy.getCacheObjectClazz() ); - Result fromResultLevelCacheRes = strategy.pullFromCache(true).apply(fromResultLevelCacheValue); + Result fromResultLevelCacheRes = strategy.pullFromCache(true) + .apply(fromResultLevelCacheValue); Assert.assertEquals(result2, fromResultLevelCacheRes); final Result result3 = new Result<>( @@ -349,4 +352,63 @@ public void testResultLevelCacheKeyWithGrandTotal() ) ); } + + @Test + public void testResultArrayFields() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2000/3000") + .descending(descending) + .granularity(Granularities.HOUR) + .aggregators(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .postAggregators(QueryRunnerTestHelper.CONSTANT) + .build(); + + Assert.assertEquals( + ImmutableList.of("__time", "rows", "index", "uniques", "const"), + TOOL_CHEST.resultArrayFields(query) + ); + } + + @Test + public void testResultsAsArrays() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2000/3000") + .descending(descending) + .granularity(Granularities.HOUR) + .aggregators(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .postAggregators(QueryRunnerTestHelper.CONSTANT) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{DateTimes.of("2000").getMillis(), 1L, 2L, 3L, 1L}, + new Object[]{DateTimes.of("2000T01").getMillis(), 4L, 5L, 6L, 1L} + ), + TOOL_CHEST.resultsAsArrays( + query, + Sequences.simple( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 1L, "index", 2L, "uniques", 3L, "const", 1L) + ) + ), + new Result<>( + DateTimes.of("2000T01"), + new TimeseriesResultValue( + ImmutableMap.of("rows", 4L, "index", 5L, "uniques", 6L, "const", 1L) + ) + ) + ) + ) + ) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index 6c1ee84aa7e9..30226542946f 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -24,16 +24,19 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQueryRunners; @@ -62,6 +65,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -76,6 +80,12 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest private static final SegmentId SEGMENT_ID = SegmentId.dummy("testSegment"); + @BeforeClass + public static void setUpClass() + { + NullHandling.initializeForTests(); + } + @Test public void testCacheStrategy() throws Exception { @@ -290,6 +300,68 @@ public void testMinTopNThreshold() } } + @Test + public void testResultArrayFields() + { + final TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.ALL) + .dimension(new DefaultDimensionSpec("col", "dim")) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .aggregators(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .postAggregators(QueryRunnerTestHelper.CONSTANT) + .threshold(1) + .build(); + + Assert.assertEquals( + ImmutableList.of("__time", "dim", "rows", "index", "uniques", "const"), + new TopNQueryQueryToolChest(null, null).resultArrayFields(query) + ); + } + + @Test + public void testResultsAsArrays() + { + final TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.ALL) + .dimension(new DefaultDimensionSpec("col", "dim")) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .aggregators(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS) + .postAggregators(QueryRunnerTestHelper.CONSTANT) + .threshold(1) + .build(); + + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{DateTimes.of("2000").getMillis(), "foo", 1L, 2L, 3L, 1L}, + new Object[]{DateTimes.of("2000").getMillis(), "bar", 4L, 5L, 6L, 1L} + ), + new TopNQueryQueryToolChest(null, null).resultsAsArrays( + query, + Sequences.simple( + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TopNResultValue( + ImmutableList.of( + new DimensionAndMetricValueExtractor( + ImmutableMap.of("dim", "foo", "rows", 1L, "index", 2L, "uniques", 3L, "const", 1L) + ), + new DimensionAndMetricValueExtractor( + ImmutableMap.of("dim", "bar", "rows", 4L, "index", 5L, "uniques", 6L, "const", 1L) + ) + ) + ) + ) + ) + ) + ) + ); + } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { switch (valueType) { diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index b8e5f7a90c11..87508107d194 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -351,6 +351,7 @@ public QueryToolChest getToolChest() throw new ISE("Not yet initialized"); } + //noinspection unchecked return toolChest; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/SimpleExtraction.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/SimpleExtraction.java index 7651b0df23de..867920267f6c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/SimpleExtraction.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/SimpleExtraction.java @@ -27,23 +27,27 @@ import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.column.ValueType; +import javax.annotation.Nullable; +import java.util.Objects; + /** - * Represents a "simple" extraction of a value from a Druid row, which is defined as a column plus an extractionFn. - * This is useful since identifying simple extractions and treating them specially can allow Druid to perform - * additional optimizations. + * Represents a "simple" extraction of a value from a Druid row, which is defined as a column plus an optional + * extractionFn. This is useful since identifying simple extractions and treating them specially can allow Druid to + * perform additional optimizations. */ public class SimpleExtraction { private final String column; + @Nullable private final ExtractionFn extractionFn; - public SimpleExtraction(String column, ExtractionFn extractionFn) + public SimpleExtraction(String column, @Nullable ExtractionFn extractionFn) { this.column = Preconditions.checkNotNull(column, "column"); this.extractionFn = extractionFn; } - public static SimpleExtraction of(String column, ExtractionFn extractionFn) + public static SimpleExtraction of(String column, @Nullable ExtractionFn extractionFn) { return new SimpleExtraction(column, extractionFn); } @@ -53,6 +57,7 @@ public String getColumn() return column; } + @Nullable public ExtractionFn getExtractionFn() { return extractionFn; @@ -86,22 +91,15 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SimpleExtraction that = (SimpleExtraction) o; - - if (column != null ? !column.equals(that.column) : that.column != null) { - return false; - } - return extractionFn != null ? extractionFn.equals(that.extractionFn) : that.extractionFn == null; - + return column.equals(that.column) && + Objects.equals(extractionFn, that.extractionFn); } @Override public int hashCode() { - int result = column != null ? column.hashCode() : 0; - result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0); - return result; + return Objects.hash(column, extractionFn); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index 6ba8dfa14a34..1910bc27b654 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -21,12 +21,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.runtime.Hook; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; @@ -39,28 +38,20 @@ import org.apache.druid.math.expr.Evals; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; -import org.apache.druid.query.Result; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQuery; -import org.apache.druid.query.timeseries.TimeseriesResultValue; -import org.apache.druid.query.topn.DimensionAndMetricValueExtractor; -import org.apache.druid.query.topn.TopNQuery; -import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.table.RowSignature; import org.joda.time.DateTime; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -93,194 +84,119 @@ public ObjectMapper getJsonMapper() public Sequence runQuery(final DruidQuery druidQuery) { - final Query query = druidQuery.getQuery(); - - final Query innerMostQuery = findInnerMostQuery(query); - if (plannerContext.getPlannerConfig().isRequireTimeCondition() && - innerMostQuery.getIntervals().equals(Intervals.ONLY_ETERNITY)) { - throw new CannotBuildQueryException( - "requireTimeCondition is enabled, all queries must include a filter condition on the __time column" - ); + final Query query = druidQuery.getQuery(); + + if (plannerContext.getPlannerConfig().isRequireTimeCondition()) { + final Query innerMostQuery = findInnerMostQuery(query); + if (innerMostQuery.getIntervals().equals(Intervals.ONLY_ETERNITY)) { + throw new CannotBuildQueryException( + "requireTimeCondition is enabled, all queries must include a filter condition on the __time column" + ); + } } - if (query instanceof TimeseriesQuery) { - return executeTimeseries(druidQuery, (TimeseriesQuery) query); - } else if (query instanceof TopNQuery) { - return executeTopN(druidQuery, (TopNQuery) query); - } else if (query instanceof GroupByQuery) { - return executeGroupBy(druidQuery, (GroupByQuery) query); - } else if (query instanceof ScanQuery) { - return executeScan(druidQuery, (ScanQuery) query); + final List rowOrder; + if (query instanceof TimeseriesQuery && !druidQuery.getGrouping().getDimensions().isEmpty()) { + // Hack for timeseries queries: when generating them, DruidQuery.toTimeseriesQuery translates a dimension + // based on a timestamp_floor expression into a 'granularity'. This is not reflected in the druidQuery's + // output row signature, so we have to account for it here. When groupBy on timestamp_floor expressions is + // just as fast as a timeseries query (a noble goal) we can remove timeseries queries from the SQL layer and + // also remove this hack. + final String timeDimension = Iterables.getOnlyElement(druidQuery.getGrouping().getDimensions()).getOutputName(); + rowOrder = druidQuery.getOutputRowSignature().getRowOrder().stream() + .map(f -> timeDimension.equals(f) ? ColumnHolder.TIME_COLUMN_NAME : f) + .collect(Collectors.toList()); } else { - throw new ISE("Cannot run query of class[%s]", query.getClass().getName()); + rowOrder = druidQuery.getOutputRowSignature().getRowOrder(); } + + return execute( + query, + rowOrder, + druidQuery.getOutputRowType() + .getFieldList() + .stream() + .map(f -> f.getType().getSqlTypeName()) + .collect(Collectors.toList()) + ); } - private Query findInnerMostQuery(Query outerQuery) + private Query findInnerMostQuery(Query outerQuery) { - Query query = outerQuery; + Query query = outerQuery; while (query.getDataSource() instanceof QueryDataSource) { query = ((QueryDataSource) query.getDataSource()).getQuery(); } return query; } - private Sequence executeScan( - final DruidQuery druidQuery, - final ScanQuery query - ) + private Sequence execute(Query query, final List newFields, final List newTypes) { - final List fieldList = druidQuery.getOutputRowType().getFieldList(); - final RowSignature outputRowSignature = druidQuery.getOutputRowSignature(); - - // SQL row column index -> Scan query column index - final int[] columnMapping = new int[outputRowSignature.getRowOrder().size()]; - final Map scanColumnOrder = new HashMap<>(); - - for (int i = 0; i < query.getColumns().size(); i++) { - scanColumnOrder.put(query.getColumns().get(i), i); - } + Hook.QUERY_PLAN.run(query); - for (int i = 0; i < outputRowSignature.getRowOrder().size(); i++) { - final Integer index = scanColumnOrder.get(outputRowSignature.getRowOrder().get(i)); - columnMapping[i] = index == null ? -1 : index; + if (query.getId() == null) { + final String queryId = UUID.randomUUID().toString(); + plannerContext.addNativeQueryId(queryId); + query = query.withId(queryId); } - return Sequences.concat( - Sequences.map( - runQuery(query), - scanResult -> { - final List retVals = new ArrayList<>(); - final List> rows = (List>) scanResult.getEvents(); - - for (List row : rows) { - final Object[] retVal = new Object[fieldList.size()]; - for (RelDataTypeField field : fieldList) { - retVal[field.getIndex()] = coerce( - row.get(columnMapping[field.getIndex()]), - field.getType().getSqlTypeName() - ); - } - retVals.add(retVal); - } - - return Sequences.simple(retVals); - } - ) - ); - } - - private Sequence runQuery(Query query) - { - Hook.QUERY_PLAN.run(query); - - final String queryId = UUID.randomUUID().toString(); - plannerContext.addNativeQueryId(queryId); - query = query.withId(queryId) - .withSqlQueryId(plannerContext.getSqlQueryId()); + query = query.withSqlQueryId(plannerContext.getSqlQueryId()); final AuthenticationResult authenticationResult = plannerContext.getAuthenticationResult(); - return queryLifecycleFactory.factorize().runSimple(query, authenticationResult, null); - } + final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize(); - private Sequence executeTimeseries( - final DruidQuery druidQuery, - final TimeseriesQuery query - ) - { - final List fieldList = druidQuery.getOutputRowType().getFieldList(); - final String timeOutputName = druidQuery.getGrouping().getDimensions().isEmpty() - ? null - : Iterables.getOnlyElement(druidQuery.getGrouping().getDimensions()) - .getOutputName(); + // After calling "runSimple" the query will start running. We need to do this before reading the toolChest, since + // otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do + // array-based results before starting the query; but in practice we don't expect this to happen since we keep + // tight control over which query types we generate in the SQL layer. They all support array-based results.) + final Sequence results = queryLifecycle.runSimple(query, authenticationResult, null); - return Sequences.map( - runQuery(query), - new Function, Object[]>() - { - @Override - public Object[] apply(final Result result) - { - final Map row = result.getValue().getBaseObject(); - final Object[] retVal = new Object[fieldList.size()]; + //noinspection unchecked + final QueryToolChest> toolChest = queryLifecycle.getToolChest(); + final List resultArrayFields = toolChest.resultArrayFields(query); + final Sequence resultArrays = toolChest.resultsAsArrays(query, results); - for (final RelDataTypeField field : fieldList) { - final String outputName = druidQuery.getOutputRowSignature().getRowOrder().get(field.getIndex()); - if (outputName.equals(timeOutputName)) { - retVal[field.getIndex()] = coerce(result.getTimestamp(), field.getType().getSqlTypeName()); - } else { - retVal[field.getIndex()] = coerce(row.get(outputName), field.getType().getSqlTypeName()); - } - } - - return retVal; - } - } - ); + return remapFields(resultArrays, resultArrayFields, newFields, newTypes); } - private Sequence executeTopN( - final DruidQuery druidQuery, - final TopNQuery query + private Sequence remapFields( + final Sequence sequence, + final List originalFields, + final List newFields, + final List newTypes ) { - final List fieldList = druidQuery.getOutputRowType().getFieldList(); - - return Sequences.concat( - Sequences.map( - runQuery(query), - new Function, Sequence>() - { - @Override - public Sequence apply(final Result result) - { - final List rows = result.getValue().getValue(); - final List retVals = new ArrayList<>(rows.size()); - - for (DimensionAndMetricValueExtractor row : rows) { - final Object[] retVal = new Object[fieldList.size()]; - for (final RelDataTypeField field : fieldList) { - final String outputName = druidQuery.getOutputRowSignature().getRowOrder().get(field.getIndex()); - retVal[field.getIndex()] = coerce(row.getMetric(outputName), field.getType().getSqlTypeName()); - } - - retVals.add(retVal); - } - - return Sequences.simple(retVals); - } - } - ) - ); - } + // Build hash map for looking up original field positions, in case the number of fields is super high. + final Object2IntMap originalFieldsLookup = new Object2IntOpenHashMap<>(); + originalFieldsLookup.defaultReturnValue(-1); + for (int i = 0; i < originalFields.size(); i++) { + originalFieldsLookup.put(originalFields.get(i), i); + } - private Sequence executeGroupBy( - final DruidQuery druidQuery, - final GroupByQuery query - ) - { - final List fieldList = druidQuery.getOutputRowType().getFieldList(); - final Object2IntMap resultRowPositionLookup = query.getResultRowPositionLookup(); - final List sqlRowOrder = druidQuery.getOutputRowSignature().getRowOrder(); - final int[] resultRowPositions = new int[fieldList.size()]; + // Build "mapping" array of new field index -> old field index. + final int[] mapping = new int[newFields.size()]; + for (int i = 0; i < newFields.size(); i++) { + final String newField = newFields.get(i); + final int idx = originalFieldsLookup.getInt(newField); + if (idx < 0) { + throw new ISE( + "newField[%s] not contained in originalFields[%s]", + newField, + String.join(", ", originalFields) + ); + } - for (final RelDataTypeField field : fieldList) { - final String columnName = sqlRowOrder.get(field.getIndex()); - final int resultRowPosition = resultRowPositionLookup.applyAsInt(columnName); - resultRowPositions[field.getIndex()] = resultRowPosition; + mapping[i] = idx; } return Sequences.map( - runQuery(query), - resultRow -> { - final Object[] retVal = new Object[fieldList.size()]; - for (RelDataTypeField field : fieldList) { - retVal[field.getIndex()] = coerce( - resultRow.get(resultRowPositions[field.getIndex()]), - field.getType().getSqlTypeName() - ); + sequence, + array -> { + final Object[] newArray = new Object[mapping.length]; + for (int i = 0; i < mapping.length; i++) { + newArray[i] = coerce(array[mapping[i]], newTypes.get(i)); } - return retVal; + return newArray; } ); }