Skip to content

Commit

Permalink
Window function on msq (#15470)
Browse files Browse the repository at this point in the history
This PR aims to introduce Window functions on MSQ by doing the following:

    Introduce a Window querykit for handling window queries along with its factory and a processor for window queries
    If a window operator is present with a partition by clause, pushes the partition as a shuffle spec of the previous stage
    In presence of empty OVER() clause lets all operators loose on a single rac
    In presence of no empty OVER() clause, breaks down each window into individual stages
    Associated machinery to handle window functions in MSQ
    Introduced a separate hidden engine feature WINDOW_LEAF_OPERATOR which is set only for MSQ engine. In presence of this feature, the planner plans without the leaf operators by creating a window query over an inner scan query. In case of native this is set to false and the planner generates the leafOperators
    Guardrails around materialization
    Comprehensive UTs
  • Loading branch information
somu-imply committed Mar 28, 2024
1 parent 7649957 commit 524842a
Show file tree
Hide file tree
Showing 44 changed files with 3,550 additions and 132 deletions.
6 changes: 6 additions & 0 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ properties, and the `indexSpec` [`tuningConfig`](../ingestion/ingestion-spec.md#
- `EXTERN` with input sources that match large numbers of files may exhaust available memory on the controller task.

- `EXTERN` refers to external files. Use `FROM` to access `druid` input sources.

## `WINDOW` Function

- The maximum number of elements in a window cannot exceed a value of 100,000.
- To avoid `leafOperators` in MSQ engine, window functions have an extra scan stage after the window stage for cases
where native engine has a non-empty `leafOperator`.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
Expand Down Expand Up @@ -1164,7 +1163,6 @@ public void testHllEstimateAsVirtualColumnWithTopN()
public void testHllWithOrderedWindowing()
{
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT dim1,coalesce(cast(l1 as integer),-999),"
+ " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)"
Expand All @@ -1191,7 +1189,6 @@ public void testResultCacheWithWindowing()
skipVectorize();
for (int i = 0; i < 2; i++) {
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT "
+ " TIME_FLOOR(__time, 'P1D') as dayLvl,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
Expand All @@ -186,6 +187,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -1201,6 +1203,7 @@ private QueryKit makeQueryControllerToolKit()
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.build();

return new MultiQueryKit(kitMap);
Expand Down Expand Up @@ -2769,7 +2772,6 @@ private void startStages() throws IOException, InterruptedException
if (isFailOnEmptyInsertEnabled && Boolean.TRUE.equals(isShuffleStageOutputEmpty)) {
throw new MSQException(new InsertCannotBeEmptyFault(task.getDataSource()));
}

final ClusterByPartitions partitionBoundaries =
queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,11 @@ public class Limits
* Max number of partition buckets for ingestion queries.
*/
public static final int MAX_PARTITION_BUCKETS = 5_000;

/**
* Max number of rows with the same key in a window. This acts as a guardrail for
* data distribution with high cardinality
*/
public static final int MAX_ROWS_MATERIALIZED_IN_WINDOW = 100_000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
import org.apache.druid.msq.querykit.InputNumberDataSource;
import org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
Expand Down Expand Up @@ -159,6 +160,7 @@ public List<? extends Module> getJacksonModules()
NilExtraInfoHolder.class,
SortMergeJoinFrameProcessorFactory.class,
QueryResultFrameProcessorFactory.class,
WindowOperatorQueryFrameProcessorFactory.class,
ExportResultsFrameProcessorFactory.class,

// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.msq.util.MultiStageQueryContext;

import java.util.Objects;

@JsonTypeName(TooManyRowsInAWindowFault.CODE)
public class TooManyRowsInAWindowFault extends BaseMSQFault
{

static final String CODE = "TooManyRowsInAWindow";

private final int numRows;
private final int maxRows;

@JsonCreator
public TooManyRowsInAWindowFault(
@JsonProperty("numRows") final int numRows,
@JsonProperty("maxRows") final int maxRows
)
{
super(
CODE,
"Too many rows in a window (requested = %d, max = %d). "
+ " Try creating a window with a higher cardinality column or change the query shape."
+ " Or you can change the max using query context param %s ."
+ " Use it carefully as a higher value can lead to OutOfMemory errors. ",
numRows,
maxRows,
MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW
);
this.numRows = numRows;
this.maxRows = maxRows;
}

@JsonProperty
public int getNumRows()
{
return numRows;
}

@JsonProperty
public int getMaxRows()
{
return maxRows;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyRowsInAWindowFault that = (TooManyRowsInAWindowFault) o;
return numRows == that.numRows && maxRows == that.maxRows;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numRows, maxRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
Expand Down Expand Up @@ -88,7 +89,6 @@ public class DataSourcePlan
* of subqueries.
*/
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>();

private static final Logger log = new Logger(DataSourcePlan.class);

static {
Expand Down Expand Up @@ -209,7 +209,8 @@ public static DataSourcePlan forDataSource(
(QueryDataSource) dataSource,
maxWorkerCount,
minStageNumber,
broadcast
broadcast,
queryContext
);
} else if (dataSource instanceof UnionDataSource) {
return forUnion(
Expand Down Expand Up @@ -419,15 +420,25 @@ private static DataSourcePlan forQuery(
final QueryDataSource dataSource,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
final boolean broadcast,
@Nullable final QueryContext parentContext
)
{
// check if parentContext has a window operator
final Map<String, Object> windowShuffleMap = new HashMap<>();
if (parentContext != null && parentContext.containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
windowShuffleMap.put(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, parentContext.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL));
}
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,

// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
windowShuffleMap.isEmpty()
? dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
: dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
.withOverriddenContext(windowShuffleMap),
queryKit,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount),
maxWorkerCount,
Expand Down Expand Up @@ -683,7 +694,8 @@ private static DataSourcePlan forSortMergeJoin(
(QueryDataSource) dataSource.getLeft(),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
false,
null
);
leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);

Expand All @@ -696,7 +708,8 @@ private static DataSourcePlan forSortMergeJoin(
(QueryDataSource) dataSource.getRight(),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
false,
null
);
rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);

Expand Down

0 comments on commit 524842a

Please sign in to comment.