Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore context flag for window functions #16229

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
Expand Down Expand Up @@ -1163,6 +1164,7 @@ public void testHllEstimateAsVirtualColumnWithTopN()
public void testHllWithOrderedWindowing()
{
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT dim1,coalesce(cast(l1 as integer),-999),"
+ " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)"
Expand All @@ -1189,6 +1191,7 @@ public void testResultCacheWithWindowing()
skipVectorize();
for (int i = 0; i < 2; i++) {
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT "
+ " TIME_FLOOR(__time, 'P1D') as dayLvl,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.SqlEngine;
Expand Down Expand Up @@ -259,6 +260,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll")
.put(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)
.build();

public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -621,6 +622,15 @@ private boolean isUnboundedOrCurrent(@Nullable SqlNode bound)
@Override
public void validateCall(SqlCall call, SqlValidatorScope scope)
{
if (call.getKind() == SqlKind.OVER) {
if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
throw buildCalciteContextException(
StringUtils.format(
"The query contains window functions; To run these window functions, specify [%s] in query context.",
PlannerContext.CTX_ENABLE_WINDOW_FNS),
call);
}
}
if (call.getKind() == SqlKind.NULLS_FIRST) {
SqlNode op0 = call.getOperandList().get(0);
if (op0.getKind() == SqlKind.DESCENDING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public class PlannerContext
*/
public static final String CTX_SQL_OUTER_LIMIT = "sqlOuterLimit";

/**
* Key to enable window functions.
*/
public static final String CTX_ENABLE_WINDOW_FNS = "enableWindowing";

/**
* Context key for {@link PlannerContext#isUseBoundsAndSelectors()}.
Expand Down Expand Up @@ -571,9 +575,15 @@ public SqlEngine getEngine()
* Checks if the current {@link SqlEngine} supports a particular feature.
*
* When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)}
* because it also verifies feature flags such as {@link #CTX_ENABLE_WINDOW_FNS}.
*/
public boolean featureAvailable(final EngineFeature feature)
{
if (feature == EngineFeature.WINDOW_FUNCTIONS &&
!QueryContexts.getAsBoolean(CTX_ENABLE_WINDOW_FNS, queryContext.get(CTX_ENABLE_WINDOW_FNS), false)) {
// Short-circuit: feature requires context flag.
return false;
}
if (feature == EngineFeature.TIME_BOUNDARY_QUERY && !queryContext().isTimeBoundaryPlanningEnabled()) {
// Short-circuit: feature requires context flag.
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14927,10 +14927,22 @@ public void testLatestByOnStringColumnWithoutMaxBytesSpecified()
));
}

@Test
public void testWindowingErrorWithoutFeatureFlag()
{
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, false))
.sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo")
.run());

assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, specify [enableWindowing] in query context. (line [1], column [13])"));
}

@Test
public void testUnSupportedNullsFirst()
{
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 DESC NULLS FIRST) from druid.foo")
.run());

Expand All @@ -14941,6 +14953,7 @@ public void testUnSupportedNullsFirst()
public void testUnSupportedNullsLast()
{
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 NULLS LAST) from druid.foo")
.run());
assertThat(e, invalidSqlIs("ASCENDING ordering with NULLS LAST is not supported! (line [1], column [41])"));
Expand All @@ -14952,6 +14965,7 @@ public void testUnSupportedRangeBounds()
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);

DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) from druid.foo")
.run());
assertThat(e, invalidSqlIs("The query contains a window frame which may return incorrect results. To disregard this warning, set [windowingStrictValidation] to false in the query context. (line [1], column [31])"));
Expand All @@ -14963,6 +14977,7 @@ public void testUnSupportedWindowBoundExpressions()
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);

DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN dim1 PRECEDING AND dim1 FOLLOWING) from druid.foo")
.run());
assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])"));
Expand All @@ -14976,11 +14991,13 @@ public void testUnSupportedWindowBoundTypes()

DruidException e;
e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo")
.run());
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));

e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo")
.run());
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
Expand All @@ -14994,6 +15011,7 @@ public void testNtileNotSupportedWithFrame()
DruidException e = assertThrows(
DruidException.class,
() -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT ntile(4) OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) from druid.foo")
.run()
);
Expand Down Expand Up @@ -15201,6 +15219,7 @@ public void testWindowingWithScanAndSort()

testBuilder()
.sql(sql)
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.expectedQuery(
WindowOperatorQueryBuilder.builder()
.setDataSource(
Expand Down Expand Up @@ -15288,6 +15307,7 @@ public void testWindowingWithOrderBy()
)
.queryContext(
ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
)
)
Expand Down Expand Up @@ -15388,6 +15408,7 @@ public void testWindowingOverJoin()
)
.queryContext(
ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.sql.calcite;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -52,6 +54,7 @@ public void testTasksSumOver()
msqIncompatible();

testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("select datasource, sum(duration) over () from sys.tasks group by datasource")
.expectedResults(ImmutableList.of(
new Object[]{"foo", 11L},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.sql.calcite.CalciteWindowQueryTest.WindowQueryTestInputClass.TestType;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -197,6 +198,7 @@ public void windowQueryTest(String filename) throws Exception
.skipVectorize(true)
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
))
Expand All @@ -219,6 +221,7 @@ public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) th
.skipVectorize(true)
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
QueryContexts.WINDOWING_STRICT_VALIDATION, false
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.planner.PlannerCaptureHook;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -480,9 +481,11 @@ public void windowQueryTest()
testBuilder()
.skipVectorize(true)
.queryContext(ImmutableMap.of(
PlannerCaptureHook.NEED_CAPTURE_HOOK, true,
QueryContexts.ENABLE_DEBUG, true)
)
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
PlannerCaptureHook.NEED_CAPTURE_HOOK, true,
QueryContexts.ENABLE_DEBUG, true
)
)
.sql(testCase.getQueryString())
.expectedResults(new TextualResultsVerifier(testCase.getExpectedResults(), null))
.run();
Expand Down